结构化流 - foreach/foreachBatch 不工作
StructuredStreaming - foreach/foreachBatch not working
我正在构建 Streaming 以从 Kafka 读取数据,写入 BigQuery(尽管目前,我正在写入控制台)。
我正在尝试使用 foreach(或 foreachBatch)对记录进行转换,
但是我 运行 遇到了问题。
这是代码:
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
print("df_stream -> ", df_stream, type(df_stream))
#df_stream -> DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int] <class 'pyspark.sql.dataframe.DataFrame'>
def convertToDict(self,row):
print(" IN CONVERT TO DICT ", row, " currentTime ", datetime.datetime.now())
query = df_stream.selectExpr("CAST(value AS STRING)").writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", "/Users/karanalang/Documents/Technology/gcp/DataProc/checkpoint/") \
.foreach(convertToDict) \
.start()
query.awaitTermination()
使用 foreach 时出错
22/02/07 12:36:49 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
out_iter = func(split_index, iterator)
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1037, in func_without_process
TypeError: convertToDict() missing 1 required positional argument: 'row'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:703)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:685)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.close(ForeachWriterTable.scala:168)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1518)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:358)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
如何将 'row' 传递给函数,我如何访问行以进行转换?
当我使用 foreachBatch(在上面的代码中)时,会打印 epochId(或者是 batchId?)。我如何使用它来访问行并进行转换?
IN CONVERT TO DICT 44 currentTime 2022-02-07 12:50:27.074291
IN CONVERT TO DICT 45 currentTime 2022-02-07 12:50:36.591686
IN CONVERT TO DICT 46 currentTime 2022-02-07 12:50:40.316790
IN CONVERT TO DICT 47 currentTime 2022-02-07 12:50:50.322389
IN CONVERT TO DICT 48 currentTime 2022-02-07 12:51:00.346152
IN CONVERT TO DICT 49 currentTime 2022-02-07 12:51:10.302129
IN CONVERT TO DICT 50 currentTime 2022-02-07 12:51:20.350064
IN CONVERT TO DICT 51 currentTime 2022-02-07 12:51:30.313543
请注意:
我需要检查每一行,并对行进行转换
那么,我应该使用 foreach 还是 foreachBatch?
您似乎没有实现正确的接口。您需要像文档中提到的那样扩展 ForEachWriter -
datasetOfString.writeStream.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach
或者,如果您希望一次应用一批转换,则可以使用 foreachBatch。这也为您提供了您可以自己用于重复数据删除的 batchid。
我正在构建 Streaming 以从 Kafka 读取数据,写入 BigQuery(尽管目前,我正在写入控制台)。 我正在尝试使用 foreach(或 foreachBatch)对记录进行转换, 但是我 运行 遇到了问题。
这是代码:
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
print("df_stream -> ", df_stream, type(df_stream))
#df_stream -> DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int] <class 'pyspark.sql.dataframe.DataFrame'>
def convertToDict(self,row):
print(" IN CONVERT TO DICT ", row, " currentTime ", datetime.datetime.now())
query = df_stream.selectExpr("CAST(value AS STRING)").writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", "/Users/karanalang/Documents/Technology/gcp/DataProc/checkpoint/") \
.foreach(convertToDict) \
.start()
query.awaitTermination()
使用 foreach 时出错
22/02/07 12:36:49 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process
out_iter = func(split_index, iterator)
File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1037, in func_without_process
TypeError: convertToDict() missing 1 required positional argument: 'row'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:703)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:685)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.close(ForeachWriterTable.scala:168)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1518)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:358)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
如何将 'row' 传递给函数,我如何访问行以进行转换?
当我使用 foreachBatch(在上面的代码中)时,会打印 epochId(或者是 batchId?)。我如何使用它来访问行并进行转换?
IN CONVERT TO DICT 44 currentTime 2022-02-07 12:50:27.074291
IN CONVERT TO DICT 45 currentTime 2022-02-07 12:50:36.591686
IN CONVERT TO DICT 46 currentTime 2022-02-07 12:50:40.316790
IN CONVERT TO DICT 47 currentTime 2022-02-07 12:50:50.322389
IN CONVERT TO DICT 48 currentTime 2022-02-07 12:51:00.346152
IN CONVERT TO DICT 49 currentTime 2022-02-07 12:51:10.302129
IN CONVERT TO DICT 50 currentTime 2022-02-07 12:51:20.350064
IN CONVERT TO DICT 51 currentTime 2022-02-07 12:51:30.313543
请注意:
我需要检查每一行,并对行进行转换
那么,我应该使用 foreach 还是 foreachBatch?
您似乎没有实现正确的接口。您需要像文档中提到的那样扩展 ForEachWriter -
datasetOfString.writeStream.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach
或者,如果您希望一次应用一批转换,则可以使用 foreachBatch。这也为您提供了您可以自己用于重复数据删除的 batchid。