Azure 事件中心到 Databricks,正在使用的数据帧发生了什么
Azure Event Hubs to Databricks, what happens to the dataframes in use
我一直在开发 Azure 事件中心的概念验证,使用 Pyspark 将 json 数据流式传输到 Azure Databricks Notebook。在我看到的示例中,我创建了如下粗略代码,将数据从事件中心获取到增量 table 我将用作目的地
connectionString = "My End Point"
ehConf = {'eventhubs.connectionString' : connectionString}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readEventStream = df.withColumn("body", \
df["body"].cast("string")). \
withColumn("date_only", to_date(col("enqueuedTime")))
readEventStream.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/delta/testSink/streamprocess") \
.table("testSink")
通过谷歌搜索阅读后,df 和 readEventStream 数据帧发生了什么变化?它们会随着保留数据而变大,还是会在正常过程中清空?或者它只是在将数据转储到 Delta table 之前的临时存储?有没有一种方法可以在写入 Delta 之前设置 X 数量的流式传输项目 table?
谢谢
我在pyspark.sql module
, I think the memory usage of bigger and bigger was caused by the function table(tableName)
的PySpark官方文档中仔细查看了您在代码中使用的API的描述,如下图是针对DataFrame
的,而不是针对[=27=的]流 DataFrame
.
所以table
函数创建数据结构来填充内存中的流数据。
我建议你需要先使用start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options)
完成流写入操作,然后再从delta lake中获取table。而且似乎没有办法在写入 Delta table.
之前设置 X 数量的使用 PySpark 流式传输的项目
我一直在开发 Azure 事件中心的概念验证,使用 Pyspark 将 json 数据流式传输到 Azure Databricks Notebook。在我看到的示例中,我创建了如下粗略代码,将数据从事件中心获取到增量 table 我将用作目的地
connectionString = "My End Point"
ehConf = {'eventhubs.connectionString' : connectionString}
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
readEventStream = df.withColumn("body", \
df["body"].cast("string")). \
withColumn("date_only", to_date(col("enqueuedTime")))
readEventStream.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/delta/testSink/streamprocess") \
.table("testSink")
通过谷歌搜索阅读后,df 和 readEventStream 数据帧发生了什么变化?它们会随着保留数据而变大,还是会在正常过程中清空?或者它只是在将数据转储到 Delta table 之前的临时存储?有没有一种方法可以在写入 Delta 之前设置 X 数量的流式传输项目 table?
谢谢
我在pyspark.sql module
, I think the memory usage of bigger and bigger was caused by the function table(tableName)
的PySpark官方文档中仔细查看了您在代码中使用的API的描述,如下图是针对DataFrame
的,而不是针对[=27=的]流 DataFrame
.
所以table
函数创建数据结构来填充内存中的流数据。
我建议你需要先使用start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options)
完成流写入操作,然后再从delta lake中获取table。而且似乎没有办法在写入 Delta table.