Structured Streaming如何执行pandas_udf?

How does Structured Streaming execute pandas_udf?

我想了解结构化流式处理如何处理即将到来的新数据。

如果更多行同时到达,spark 将它们附加到输入流数据帧,对吗?

如果我有一个 withColumn 并应用一个 pandas_udf,该函数每行调用一次,或者只调用一次,然后将这些行传递给 pandas_udf?

让我们这样说:

dfInt = spark \
    .readStream \
    .load() \
    .withColumn("prediction", predict( (F.struct([col(x) for x in (features)]))))

如果更多行同时到达,它们一起处理还是每行处理一次?= 有机会限制每次只有一行吗?

If more rows arrive at the same time, spark append them to the input streaming dataframe, right?

我们只讨论微批处理执行引擎,对吧?这是您最有可能在流式查询中使用的内容。

Structured Streaming 使用 Source.getBatch (DataSource API V1):

在流查询中查询流源

getBatch(start: Option[Offset], end: Offset): DataFrame

Returns the data that is between the offsets (start, end]. When start is None, then the batch should begin with the first record.

无论 DataFrame 中的来源 returns 是要在微批处理中处理的数据。

If I have a withColumn and apply a pandas_udf, the function is called once per each row

总是。这就是用户定义函数在 Spark SQL.

中的工作方式

or only one time and the rows are passed to the pandas_udf?

This 说:

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data.

The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

If more rows arrive at the same time, they are processed together or once per each?

如果“到达”表示“单个 DataFrame 的一部分”,则“它们一起处理”,但一次处理一行(根据 UDF 合同)。

There is the chance to limit this to only one row per time?

你不必。就是这样设计的。一次一行。