如何将 DataFrame 的所有行存储到本地值(例如并发队列)?

How to store all rows of a DataFrame to a local value (e.g. concurrent queue)?

我正在尝试迭代使用 Spark SQL (pyspark) 执行结构化查询时获得的行。代码大致如下所示:

spark = SparkSession \
    .builder \
    .appName("Sessions")\
    .config(...) \
    .getOrCreate()

stuff = spark.read.format("parquet").options(mergeSchema=False).load(location)

result_set = spark.sql("""sql without udfs or anything too funky""")
result_set.foreach(lambda t: queue.put(t))

queue是一个局部变量——一个简单的并发队列。

当代码到达这一行时,出现以下错误:

pickle.PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute 'code'

在迭代之前调用 collect 有效,但我想知道执行这种遍历的惯用方法是什么?为什么上面的代码不起作用?

你正在调用 DataFrame foreach which will serialize and distribute your lambda to run on each partition on the executors. So if you truly want to push the rows to a queue then you need to initialize the queue in the lambda. foreachPartition 在这种情况下可能更合适,所以你为每个分区而不是每一行初始化一次队列客户端。或者,如果您知道数据量很小,您可以调用 collect 并在驱动程序上对结果进行排队。

如果你想在节点上排队数据然后在本地使用它 - 你需要使用 accumulators。否则在 foreach 中推送到队列的所有内容都不会在本地更改任何内容。

您可以阅读共享变量(累加器和广播)here