PySpark 等待在笔记本中完成(Databricks)
PySpark Wait to finish in notebook (Databricks)
目前,我在一个单元格中有一个 spark 数据帧(自动加载器)时遇到了一些问题,这可能需要一些时间来写入数据。然后,在接下来的单元格中,代码引用了第一个 table 所做的工作。但是,如果由于 spark 的分布式特性,整个笔记本是 运行(特别是作为作业),则第一个单元格完全完成之前的第二个单元格 运行s。我怎样才能让第二个单元格等待 writeStream 完成而不将它们放在单独的笔记本中。
示例:
单元格 1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
单元格 2
df = spark.sql('select count(*) from TABLE1')
您需要使用 awaitTermination
函数等待流处理完成(参见 docs)。像这样:
- 单元格 1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
- 单元格 2
df = spark.sql('select count(*) from TABLE1')
尽管这样读起来更容易也更难出错:
df = spark.read.table('TABLE1').count()
目前,我在一个单元格中有一个 spark 数据帧(自动加载器)时遇到了一些问题,这可能需要一些时间来写入数据。然后,在接下来的单元格中,代码引用了第一个 table 所做的工作。但是,如果由于 spark 的分布式特性,整个笔记本是 运行(特别是作为作业),则第一个单元格完全完成之前的第二个单元格 运行s。我怎样才能让第二个单元格等待 writeStream 完成而不将它们放在单独的笔记本中。
示例:
单元格 1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
单元格 2
df = spark.sql('select count(*) from TABLE1')
您需要使用 awaitTermination
函数等待流处理完成(参见 docs)。像这样:
- 单元格 1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
autoload.awaitTermination()
- 单元格 2
df = spark.sql('select count(*) from TABLE1')
尽管这样读起来更容易也更难出错:
df = spark.read.table('TABLE1').count()