当我在同一个笔记本中并行有两个 Spark 流 运行 时,如何使用 saveAsTable 函数?
How can I use the saveAsTable function when I have two Spark streams running in parallel in the same notebook?
我在笔记本中设置了两个 Spark 流,像这样并行 运行。
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df1 = spark \
.readStream.format("delta") \
.table("test_db.table1") \
.select('foo', 'bar')
writer_df1 = df1.writeStream.option("checkpoint_location", checkpoint_location_1) \
.foreachBatch(
lambda batch_df, batch_epoch:
process_batch(batch_df, batch_epoch)
) \
.start()
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df2 = spark \
.readStream.format("delta") \
.table("test_db.table2") \
.select('foo', 'bar')
writer_df2 = merchant_df.writeStream.option("checkpoint_location", checkpoint_location_2) \
.foreachBatch(
lambda batch_df, batch_epoch:
process_batch(batch_df, batch_epoch)
) \
.start()
然后逐行处理这些数据帧,每一行都发送到 API。如果 API 调用报告错误,我会将该行转换为 JSON 并将此行附加到数据块中的常见故障 table。
columns = ['table_name', 'record', 'time_of_failure', 'error_or_status_code']
vals = [(table_name, json.dumps(row.asDict()), datetime.now(), str(error_or_http_code))]
error_df = spark.createDataFrame(vals, columns)
error_df.select('table_name','record','time_of_failure', 'error_or_status_code').write.format('delta').mode('Append').saveAsTable("failures_db.failures_db)"
尝试将行添加到此 table 时,此处的 saveAsTable()
调用抛出以下异常。
py4j.protocol.Py4JJavaError: An error occurred while calling o3578.saveAsTable.
: java.lang.IllegalStateException: Cannot find the REPL id in Spark local properties. Spark-submit and R doesn't support transactional writes from different clusters. If you are using R, please switch to Scala or Python. If you are using spark-submit , please convert it to Databricks JAR job. Or you can disable multi-cluster writes by setting 'spark.databricks.delta.multiClusterWrites.enabled' to 'false'. If this is disabled, writes to a single table must originate from a single cluster. Please check https://docs.databricks.com/delta/delta-intro.html#frequently-asked-questions-faq for more details.
如果我注释掉其中一个流并重新 运行 笔记本,API 调用的任何错误都会毫无问题地插入到 table 中。我觉得我需要添加一些配置,但不确定从哪里开始。
不确定这是否是最佳解决方案,但我相信问题出在同时写入 table 的每个流。我将这个 table 拆分为每个流的单独 tables,然后它就起作用了。
我在笔记本中设置了两个 Spark 流,像这样并行 运行。
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df1 = spark \
.readStream.format("delta") \
.table("test_db.table1") \
.select('foo', 'bar')
writer_df1 = df1.writeStream.option("checkpoint_location", checkpoint_location_1) \
.foreachBatch(
lambda batch_df, batch_epoch:
process_batch(batch_df, batch_epoch)
) \
.start()
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df2 = spark \
.readStream.format("delta") \
.table("test_db.table2") \
.select('foo', 'bar')
writer_df2 = merchant_df.writeStream.option("checkpoint_location", checkpoint_location_2) \
.foreachBatch(
lambda batch_df, batch_epoch:
process_batch(batch_df, batch_epoch)
) \
.start()
然后逐行处理这些数据帧,每一行都发送到 API。如果 API 调用报告错误,我会将该行转换为 JSON 并将此行附加到数据块中的常见故障 table。
columns = ['table_name', 'record', 'time_of_failure', 'error_or_status_code']
vals = [(table_name, json.dumps(row.asDict()), datetime.now(), str(error_or_http_code))]
error_df = spark.createDataFrame(vals, columns)
error_df.select('table_name','record','time_of_failure', 'error_or_status_code').write.format('delta').mode('Append').saveAsTable("failures_db.failures_db)"
尝试将行添加到此 table 时,此处的 saveAsTable()
调用抛出以下异常。
py4j.protocol.Py4JJavaError: An error occurred while calling o3578.saveAsTable. : java.lang.IllegalStateException: Cannot find the REPL id in Spark local properties. Spark-submit and R doesn't support transactional writes from different clusters. If you are using R, please switch to Scala or Python. If you are using spark-submit , please convert it to Databricks JAR job. Or you can disable multi-cluster writes by setting 'spark.databricks.delta.multiClusterWrites.enabled' to 'false'. If this is disabled, writes to a single table must originate from a single cluster. Please check https://docs.databricks.com/delta/delta-intro.html#frequently-asked-questions-faq for more details.
如果我注释掉其中一个流并重新 运行 笔记本,API 调用的任何错误都会毫无问题地插入到 table 中。我觉得我需要添加一些配置,但不确定从哪里开始。
不确定这是否是最佳解决方案,但我相信问题出在同时写入 table 的每个流。我将这个 table 拆分为每个流的单独 tables,然后它就起作用了。