pyspark.ml - NGrams + CountVectorizer - 根据计数权重排序

pyspark.ml - NGrams + CountVectorizer - Sorted based on count weights

我想找到 ngram countvectorizer 并根据计数权重对输出进行排序。 我使用了以下方法:

  1. 步骤 A. 在 spark 层中找到 ngram countvectorizer。
  2. 步骤 B. 将 pyspark ml 数据帧保存到 Hive table。

面临的问题: 在步骤 B 中获取 Java 堆 Space。

查询:

一个。在执行步骤 B 之前,而不是在配置单元中写入以下所有列 table;只有少数专栏说 (1_links, 1_counts); (2_links, 2_counts) 和 (3_links, 3_counts) 分开 table秒。这样,写入配置单元 table 的数据就会减少。 pyspark.sql.dataframe.DataFrame 具有以下列: id,urllinks, 1_links,2_links,3_links,
1_counts,2_counts,3_counts

乙。 1_counts、2_counts 和 3_counts 列;这些可以从稀疏向量转换为密集向量吗?并只取出那些计数超过阈值(比如 3)的索引。这样,写入 Hive table 的数据将会减少,Java 堆 space 错误的可能性也会减少。

如果这些方法不错,请告知如何执行。我无法做到这一点(前提是不会出现 Java 堆 Space 错误)。

摄氏度。如何在同一个配置单元 table 中使用计数获取词汇表。

D.如何处理 Java 堆 space 错误。我需要研究哪些参数。在启动笔记本时使用了以下设置。

PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 -- 
executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver- 
maxResultSize 100g --executor-cores 10 --conf 
spark.yarn.executor.memoryOverhead=100g'

也请告知,有没有其他方法可以采取。

注意: 1. 数据非常大(每小时有百万点击流行)并且无法将数据拉到本地磁盘并在 scikitlearn 中做 countvectorizer。

写了下面的代码:

使用以下设置启动了 Jupyter Notebook:

`PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXXX --no-browser --ip= --NotebookApp.token= " PYSPARK_PYTHON="/opt/anaconda/anaconda4/bin/python" PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-maxResultSize 100g --executor-cores 10 --conf spark.yarn.executor.memoryOverhead=100g' PYSPARK_DRIVER_PYTHON="jupyter" pyspark`

代码如下:

# Import
from pyspark.sql import SQLContext ,HiveContext ,Row
from pyspark.sql import functions as F
import pandas as pd
from time import time
import math

from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.sql.functions import udf

from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType
from pyspark.sql.functions import udf
from collections import Counter

from pyspark.ml.feature import NGram

from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.mllib.linalg import SparseVector, DenseVector

# Setting Spark Context
sqlContext = HiveContext(sc)

#Creating the pyspark ml dataframe
df = sqlContext.sql("SELECT id, collect_set(urllink) as urllinks FROM clik_stream \
where click_year='2018' and click_month='02' and click_day='02' GROUP BY id")

def build_ngrams_part(inputCol="urllinks", n=3):

   ngrams = [
      NGram(n=i, inputCol="urllinks", outputCol="{0}_links".format(i))
      for i in range(1, n + 1)
   ]

   vectorizers = [
    CountVectorizer(inputCol="{0}_links".format(i),
        outputCol="{0}_counts".format(i))
    for i in range(1, n + 1)
   ]


#     assembler = [VectorAssembler(
#         inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
#         outputCol="features"
#     )]

#    return Pipeline(stages=ngrams + 
DenseVector(SparseVector(vectorizers).toArray()))
   return Pipeline(stages=ngrams + vectorizers)

a = build_ngrams_part().fit(df)
b = a.transform(df)

b.write.saveAsTable("output")

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-6-a6183cfa83e6> in <module>()
----> 1 b.write.saveAsTable("output")

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
    631         if format is not None:
    632             self.format(format)
--> 633         self._jwrite.saveAsTable(name)
    634 
    635     @since(1.4)

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o191.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:215)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
    at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:454)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:198)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:158)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:180)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:173)

​

我运行样本数据集上的代码(检查逻辑)。它正在按预期工作。

df_test = spark.createDataFrame([
  (1, ["a", "b", "c", "d"]),
  (2, ["d", "e", "d"]),
  (3, ["e", "f", "e"]),    
], ("id", "urllinks"))

a = build_ngrams_part().fit(df_test)
b = a.transform(df_test)
b.show(3)

stages = a.stages
from pyspark.ml.feature import CountVectorizerModel

vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
[v.vocabulary for v in vectorizers]

    +---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| id|    urllinks|     1_links|        2_links|       3_links|            1_counts|            2_counts|           3_counts|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
|  1|[a, b, c, d]|[a, b, c, d]|[a b, b c, c d]|[a b c, b c d]|(6,[1,2,3,4],[1.0...|(7,[0,2,3],[1.0,1...|(4,[0,2],[1.0,1.0])|
|  2|   [d, e, d]|   [d, e, d]|     [d e, e d]|       [d e d]| (6,[0,1],[1.0,2.0])| (7,[1,4],[1.0,1.0])|      (4,[3],[1.0])|
|  3|   [e, f, e]|   [e, f, e]|     [e f, f e]|       [e f e]| (6,[0,5],[2.0,1.0])| (7,[5,6],[1.0,1.0])|      (4,[1],[1.0])|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+

b.write.saveAsTable("sample.output")

请帮助解决查询部分中的要点。

我减少了进入 Hive 表的数据量。错误已更正..