pyspark.ml - NGrams + CountVectorizer - 根据计数权重排序
pyspark.ml - NGrams + CountVectorizer - Sorted based on count weights
我想找到 ngram countvectorizer 并根据计数权重对输出进行排序。
我使用了以下方法:
- 步骤 A. 在 spark 层中找到 ngram countvectorizer。
- 步骤 B. 将 pyspark ml 数据帧保存到 Hive table。
面临的问题:
在步骤 B 中获取 Java 堆 Space。
查询:
一个。在执行步骤 B 之前,而不是在配置单元中写入以下所有列
table;只有少数专栏说 (1_links,
1_counts); (2_links, 2_counts) 和 (3_links, 3_counts) 分开
table秒。这样,写入配置单元 table 的数据就会减少。
pyspark.sql.dataframe.DataFrame 具有以下列:
id,urllinks, 1_links,2_links,3_links,
1_counts,2_counts,3_counts
乙。 1_counts、2_counts 和 3_counts 列;这些可以从稀疏向量转换为密集向量吗?并只取出那些计数超过阈值(比如 3)的索引。这样,写入 Hive table 的数据将会减少,Java 堆 space 错误的可能性也会减少。
如果这些方法不错,请告知如何执行。我无法做到这一点(前提是不会出现 Java 堆 Space 错误)。
摄氏度。如何在同一个配置单元 table 中使用计数获取词汇表。
D.如何处理 Java 堆 space 错误。我需要研究哪些参数。在启动笔记本时使用了以下设置。
PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --
executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-
maxResultSize 100g --executor-cores 10 --conf
spark.yarn.executor.memoryOverhead=100g'
也请告知,有没有其他方法可以采取。
注意:
1. 数据非常大(每小时有百万点击流行)并且无法将数据拉到本地磁盘并在 scikitlearn 中做 countvectorizer。
写了下面的代码:
使用以下设置启动了 Jupyter Notebook:
`PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXXX --no-browser --ip= --NotebookApp.token= " PYSPARK_PYTHON="/opt/anaconda/anaconda4/bin/python" PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-maxResultSize 100g --executor-cores 10 --conf spark.yarn.executor.memoryOverhead=100g' PYSPARK_DRIVER_PYTHON="jupyter" pyspark`
代码如下:
# Import
from pyspark.sql import SQLContext ,HiveContext ,Row
from pyspark.sql import functions as F
import pandas as pd
from time import time
import math
from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType
from pyspark.sql.functions import udf
from collections import Counter
from pyspark.ml.feature import NGram
from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import SparseVector, DenseVector
# Setting Spark Context
sqlContext = HiveContext(sc)
#Creating the pyspark ml dataframe
df = sqlContext.sql("SELECT id, collect_set(urllink) as urllinks FROM clik_stream \
where click_year='2018' and click_month='02' and click_day='02' GROUP BY id")
def build_ngrams_part(inputCol="urllinks", n=3):
ngrams = [
NGram(n=i, inputCol="urllinks", outputCol="{0}_links".format(i))
for i in range(1, n + 1)
]
vectorizers = [
CountVectorizer(inputCol="{0}_links".format(i),
outputCol="{0}_counts".format(i))
for i in range(1, n + 1)
]
# assembler = [VectorAssembler(
# inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
# outputCol="features"
# )]
# return Pipeline(stages=ngrams +
DenseVector(SparseVector(vectorizers).toArray()))
return Pipeline(stages=ngrams + vectorizers)
a = build_ngrams_part().fit(df)
b = a.transform(df)
b.write.saveAsTable("output")
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-a6183cfa83e6> in <module>()
----> 1 b.write.saveAsTable("output")
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
631 if format is not None:
632 self.format(format)
--> 633 self._jwrite.saveAsTable(name)
634
635 @since(1.4)
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o191.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:454)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:198)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:158)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:180)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
我运行样本数据集上的代码(检查逻辑)。它正在按预期工作。
df_test = spark.createDataFrame([
(1, ["a", "b", "c", "d"]),
(2, ["d", "e", "d"]),
(3, ["e", "f", "e"]),
], ("id", "urllinks"))
a = build_ngrams_part().fit(df_test)
b = a.transform(df_test)
b.show(3)
stages = a.stages
from pyspark.ml.feature import CountVectorizerModel
vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
[v.vocabulary for v in vectorizers]
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| id| urllinks| 1_links| 2_links| 3_links| 1_counts| 2_counts| 3_counts|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| 1|[a, b, c, d]|[a, b, c, d]|[a b, b c, c d]|[a b c, b c d]|(6,[1,2,3,4],[1.0...|(7,[0,2,3],[1.0,1...|(4,[0,2],[1.0,1.0])|
| 2| [d, e, d]| [d, e, d]| [d e, e d]| [d e d]| (6,[0,1],[1.0,2.0])| (7,[1,4],[1.0,1.0])| (4,[3],[1.0])|
| 3| [e, f, e]| [e, f, e]| [e f, f e]| [e f e]| (6,[0,5],[2.0,1.0])| (7,[5,6],[1.0,1.0])| (4,[1],[1.0])|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
b.write.saveAsTable("sample.output")
请帮助解决查询部分中的要点。
我减少了进入 Hive 表的数据量。错误已更正..
我想找到 ngram countvectorizer 并根据计数权重对输出进行排序。 我使用了以下方法:
- 步骤 A. 在 spark 层中找到 ngram countvectorizer。
- 步骤 B. 将 pyspark ml 数据帧保存到 Hive table。
面临的问题: 在步骤 B 中获取 Java 堆 Space。
查询:
一个。在执行步骤 B 之前,而不是在配置单元中写入以下所有列
table;只有少数专栏说 (1_links,
1_counts); (2_links, 2_counts) 和 (3_links, 3_counts) 分开
table秒。这样,写入配置单元 table 的数据就会减少。
pyspark.sql.dataframe.DataFrame 具有以下列:
id,urllinks, 1_links,2_links,3_links,
1_counts,2_counts,3_counts
乙。 1_counts、2_counts 和 3_counts 列;这些可以从稀疏向量转换为密集向量吗?并只取出那些计数超过阈值(比如 3)的索引。这样,写入 Hive table 的数据将会减少,Java 堆 space 错误的可能性也会减少。
如果这些方法不错,请告知如何执行。我无法做到这一点(前提是不会出现 Java 堆 Space 错误)。
摄氏度。如何在同一个配置单元 table 中使用计数获取词汇表。
D.如何处理 Java 堆 space 错误。我需要研究哪些参数。在启动笔记本时使用了以下设置。
PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --
executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-
maxResultSize 100g --executor-cores 10 --conf
spark.yarn.executor.memoryOverhead=100g'
也请告知,有没有其他方法可以采取。
注意: 1. 数据非常大(每小时有百万点击流行)并且无法将数据拉到本地磁盘并在 scikitlearn 中做 countvectorizer。
写了下面的代码:
使用以下设置启动了 Jupyter Notebook:
`PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXXX --no-browser --ip= --NotebookApp.token= " PYSPARK_PYTHON="/opt/anaconda/anaconda4/bin/python" PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-maxResultSize 100g --executor-cores 10 --conf spark.yarn.executor.memoryOverhead=100g' PYSPARK_DRIVER_PYTHON="jupyter" pyspark`
代码如下:
# Import
from pyspark.sql import SQLContext ,HiveContext ,Row
from pyspark.sql import functions as F
import pandas as pd
from time import time
import math
from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType
from pyspark.sql.functions import udf
from collections import Counter
from pyspark.ml.feature import NGram
from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import SparseVector, DenseVector
# Setting Spark Context
sqlContext = HiveContext(sc)
#Creating the pyspark ml dataframe
df = sqlContext.sql("SELECT id, collect_set(urllink) as urllinks FROM clik_stream \
where click_year='2018' and click_month='02' and click_day='02' GROUP BY id")
def build_ngrams_part(inputCol="urllinks", n=3):
ngrams = [
NGram(n=i, inputCol="urllinks", outputCol="{0}_links".format(i))
for i in range(1, n + 1)
]
vectorizers = [
CountVectorizer(inputCol="{0}_links".format(i),
outputCol="{0}_counts".format(i))
for i in range(1, n + 1)
]
# assembler = [VectorAssembler(
# inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
# outputCol="features"
# )]
# return Pipeline(stages=ngrams +
DenseVector(SparseVector(vectorizers).toArray()))
return Pipeline(stages=ngrams + vectorizers)
a = build_ngrams_part().fit(df)
b = a.transform(df)
b.write.saveAsTable("output")
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-a6183cfa83e6> in <module>()
----> 1 b.write.saveAsTable("output")
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
631 if format is not None:
632 self.format(format)
--> 633 self._jwrite.saveAsTable(name)
634
635 @since(1.4)
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o191.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:454)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:198)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:158)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:180)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
我运行样本数据集上的代码(检查逻辑)。它正在按预期工作。
df_test = spark.createDataFrame([
(1, ["a", "b", "c", "d"]),
(2, ["d", "e", "d"]),
(3, ["e", "f", "e"]),
], ("id", "urllinks"))
a = build_ngrams_part().fit(df_test)
b = a.transform(df_test)
b.show(3)
stages = a.stages
from pyspark.ml.feature import CountVectorizerModel
vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
[v.vocabulary for v in vectorizers]
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| id| urllinks| 1_links| 2_links| 3_links| 1_counts| 2_counts| 3_counts|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| 1|[a, b, c, d]|[a, b, c, d]|[a b, b c, c d]|[a b c, b c d]|(6,[1,2,3,4],[1.0...|(7,[0,2,3],[1.0,1...|(4,[0,2],[1.0,1.0])|
| 2| [d, e, d]| [d, e, d]| [d e, e d]| [d e d]| (6,[0,1],[1.0,2.0])| (7,[1,4],[1.0,1.0])| (4,[3],[1.0])|
| 3| [e, f, e]| [e, f, e]| [e f, f e]| [e f e]| (6,[0,5],[2.0,1.0])| (7,[5,6],[1.0,1.0])| (4,[1],[1.0])|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
b.write.saveAsTable("sample.output")
请帮助解决查询部分中的要点。
我减少了进入 Hive 表的数据量。错误已更正..