规范化大型 PySpark 数据帧时,CodeGen 增长超过 64 KB 错误
CodeGen grows beyond 64 KB error when normalizing large PySpark dataframe
我有一个包含 1300 万行和 800 列的 PySpark 数据框。我需要规范化这些数据,所以一直在使用这段代码,它适用于较小的开发数据集。
def z_score_w(col, w):
avg_ = avg(col).over(w)
stddev_ = stddev_pop(col).over(w)
return (col - avg_) / stddev_
w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]
normDF = signalsDF.select(norm_exprs)
但是,当我使用完整的数据集时运行进入异常代码生成:
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:950)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:947)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
at org.codehaus.janino.UnitCompiler.access00(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler.visitBooleanLiteral(UnitCompiler.java:3267)
有几个 Spark JIRA issues around that appear similar, but these are all marked resolved. There is also 是相关的,但答案是另一种技术。
我有自己的解决方法,我对数据框的列批次进行标准化。这行得通,但我最终得到了多个数据帧,然后我必须加入,这很慢。
所以,我的问题是 - 是否有一种替代技术可用于规范化我缺少的大型数据帧?
我正在使用 spark-2.0.1。
一个明显的问题是您使用 window 函数的方式。以下框架:
Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)
在实践中有点用处。如果没有分区列,它首先将所有数据重新洗牌到一个分区。这种缩放方法只对分组缩放有用
Spark 提供了两个 类 可用于缩放特征:
pyspark.ml.feature.StandardScaler
pyspark.mllib.feature.StandardScaler
不幸的是,两者都需要 Vector
数据作为输入。有 ML
from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline
scaled = Pipeline(stages=[
VectorAssembler(inputCols=df.columns, outputCol="features"),
MLScaler(withMean=True, inputCol="features", outputCol="scaled")
]).fit(df).transform(df).select("scaled")
如果您需要原始形状,这需要进一步扩展 scaled
列。
使用 MLlib:
from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector
rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)
scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)
如果存在与列数相关的代码生成问题,后一种方法可能更有用。
解决此问题以计算全局统计数据的另一种方法
from pyspark.sql.functions import avg, col, stddev_pop, struct
stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()
和select:
df.select(*[
((col(c) - mean) / std).alias(c)
for (c, (mean, std)) in zip(df.columns, stats)
])
根据您的评论,您认为可以使用 NumPy 和一些基本转换来表达最简单的解决方案:
rdd = df.rdd.map(np.array) # Convert to RDD of NumPy vectors
stats = rdd.stats() # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev()) # Normalize
并转换回 DataFrame
:
scaled.map(lambda x: x.tolist()).toDF(df.columns)
请参阅此 link,我们通过在代码中添加检查点解决了此错误。
检查点只是将数据或数据帧写回磁盘并读回。
检查点详情
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
问:什么样的RDD需要checkpoint?
the computation takes a long time
the computing chain is too long
depends too many RDDs
其实将ShuffleMapTask的输出保存到本地磁盘也是checkpoint,只是为了partition的数据输出。
问:什么时候到检查点?
如上所述,每次需要缓存计算分区时,都会将其缓存到内存中。但是,检查点不遵循相同的原则。相反,它会等到作业结束,然后启动另一个作业来完成检查点。一个需要检查点的RDD会被计算两次;因此建议在 rdd.checkpoint() 之前执行 rdd.cache()。在这种情况下,第二个作业不会重新计算 RDD。相反,它只会读取缓存。事实上,Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 方法,就像在磁盘上缓存一样。因此,它在第一次计算时将RDD缓存在磁盘上,但是这种持久化和检查点是不同的,我们稍后再讨论它们的区别。
问:检查点如何实现?
程序如下:
RDD will be: [ Initialized --> marked for checkpointing -->
checkpointing in progress --> checkpointed ]. In the end, it will be
checkpointed.
Similalry for dataframe: Write the dataframe to disk or s3 and read the data back in a new dataframe.
已初始化
驱动端,调用rdd.checkpoint()后,RDD将由RDDCheckpointData管理。用户应设置检查点的存储路径(在 hdfs 上)。
标记为检查点
初始化后RDDCheckpointData会标记RDD MarkedForCheckpoint。
检查点正在进行中
作业完成后,将调用 finalRdd.doCheckpoint()。 finalRDD 向后扫描计算链。当遇到需要checkpoint的RDD时,会将该RDD标记为CheckpointingInProgress,然后将配置文件(写入hdfs),如core-site.xml广播给其他工作节点的blockManager。之后,将启动一个作业来完成检查点:
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf))
检查点
job完成checkpoint后,会清除RDD的所有依赖,并将RDD设置为checkpointed。然后,添加补充依赖并将父RDD设置为CheckpointRDD。 checkpointRDD以后会用到从文件系统中读取checkpoint文件,然后生成RDD分区
有趣的是:
两个RDD在驱动程序中被检查点,但只有结果(见下面的代码)被成功检查点。不确定这是一个错误还是只是下游 RDD 将被有意检查点。
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)
pairs2.checkpoint
val result = pairs1.join(pairs2)
result.checkpoint
我有一个包含 1300 万行和 800 列的 PySpark 数据框。我需要规范化这些数据,所以一直在使用这段代码,它适用于较小的开发数据集。
def z_score_w(col, w):
avg_ = avg(col).over(w)
stddev_ = stddev_pop(col).over(w)
return (col - avg_) / stddev_
w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]
normDF = signalsDF.select(norm_exprs)
但是,当我使用完整的数据集时运行进入异常代码生成:
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:950)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon.load(CodeGenerator.scala:947)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
at org.codehaus.janino.UnitCompiler.access00(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler.visitBooleanLiteral(UnitCompiler.java:3267)
有几个 Spark JIRA issues around that appear similar, but these are all marked resolved. There is also
我有自己的解决方法,我对数据框的列批次进行标准化。这行得通,但我最终得到了多个数据帧,然后我必须加入,这很慢。
所以,我的问题是 - 是否有一种替代技术可用于规范化我缺少的大型数据帧?
我正在使用 spark-2.0.1。
一个明显的问题是您使用 window 函数的方式。以下框架:
Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)
在实践中有点用处。如果没有分区列,它首先将所有数据重新洗牌到一个分区。这种缩放方法只对分组缩放有用
Spark 提供了两个 类 可用于缩放特征:
pyspark.ml.feature.StandardScaler
pyspark.mllib.feature.StandardScaler
不幸的是,两者都需要 Vector
数据作为输入。有 ML
from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline
scaled = Pipeline(stages=[
VectorAssembler(inputCols=df.columns, outputCol="features"),
MLScaler(withMean=True, inputCol="features", outputCol="scaled")
]).fit(df).transform(df).select("scaled")
如果您需要原始形状,这需要进一步扩展 scaled
列。
使用 MLlib:
from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector
rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)
scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)
如果存在与列数相关的代码生成问题,后一种方法可能更有用。
解决此问题以计算全局统计数据的另一种方法
from pyspark.sql.functions import avg, col, stddev_pop, struct
stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()
和select:
df.select(*[
((col(c) - mean) / std).alias(c)
for (c, (mean, std)) in zip(df.columns, stats)
])
根据您的评论,您认为可以使用 NumPy 和一些基本转换来表达最简单的解决方案:
rdd = df.rdd.map(np.array) # Convert to RDD of NumPy vectors
stats = rdd.stats() # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev()) # Normalize
并转换回 DataFrame
:
scaled.map(lambda x: x.tolist()).toDF(df.columns)
请参阅此 link,我们通过在代码中添加检查点解决了此错误。
检查点只是将数据或数据帧写回磁盘并读回。
检查点详情
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
问:什么样的RDD需要checkpoint?
the computation takes a long time
the computing chain is too long
depends too many RDDs
其实将ShuffleMapTask的输出保存到本地磁盘也是checkpoint,只是为了partition的数据输出。
问:什么时候到检查点?
如上所述,每次需要缓存计算分区时,都会将其缓存到内存中。但是,检查点不遵循相同的原则。相反,它会等到作业结束,然后启动另一个作业来完成检查点。一个需要检查点的RDD会被计算两次;因此建议在 rdd.checkpoint() 之前执行 rdd.cache()。在这种情况下,第二个作业不会重新计算 RDD。相反,它只会读取缓存。事实上,Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 方法,就像在磁盘上缓存一样。因此,它在第一次计算时将RDD缓存在磁盘上,但是这种持久化和检查点是不同的,我们稍后再讨论它们的区别。
问:检查点如何实现?
程序如下:
RDD will be: [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]. In the end, it will be checkpointed.
Similalry for dataframe: Write the dataframe to disk or s3 and read the data back in a new dataframe.
已初始化
驱动端,调用rdd.checkpoint()后,RDD将由RDDCheckpointData管理。用户应设置检查点的存储路径(在 hdfs 上)。
标记为检查点
初始化后RDDCheckpointData会标记RDD MarkedForCheckpoint。
检查点正在进行中
作业完成后,将调用 finalRdd.doCheckpoint()。 finalRDD 向后扫描计算链。当遇到需要checkpoint的RDD时,会将该RDD标记为CheckpointingInProgress,然后将配置文件(写入hdfs),如core-site.xml广播给其他工作节点的blockManager。之后,将启动一个作业来完成检查点:
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf))
检查点
job完成checkpoint后,会清除RDD的所有依赖,并将RDD设置为checkpointed。然后,添加补充依赖并将父RDD设置为CheckpointRDD。 checkpointRDD以后会用到从文件系统中读取checkpoint文件,然后生成RDD分区
有趣的是:
两个RDD在驱动程序中被检查点,但只有结果(见下面的代码)被成功检查点。不确定这是一个错误还是只是下游 RDD 将被有意检查点。
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)
pairs2.checkpoint
val result = pairs1.join(pairs2)
result.checkpoint