结构化流媒体 - 从同一流媒体源加入 2 个数据帧
Structured Streaming - Joining 2 DataFrames from the same Streaming source
我有一个接收 Twitter 流的 Spark 应用程序。
我添加了时间栏:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
收集我需要的不需要展平的字段:
main_df = (
timestamp_df.selectExpr(['time', 'created_at', 'id',...])
)
我将其他部分展平,将字符串列表转换为字符串:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.withWatermark('time', '10 seconds')
.groupBy(
'tmp_id', window('time', '10 seconds', '5 seconds')
)
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
然后将这 2 个 DataFrame 连接在一起以获得我需要的数据:
final_df = (
main_df
.join(entities_df, main_df.id == entities_df.tmp_id)
.select(['created_at', 'id', ...])
)
当这个 运行 时,我得到空的数据帧。
当我运行静态数据的代码使用这些代码时:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.groupBy('tmp_id')
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
如果我 运行 以上(没有水印)我得到这个错误:
Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets without watermark
谁能告诉我我做错了什么?
好的,我解决了这个问题。只需对代码进行一些更改:
timestamp_df = tmp_df2.withColumn('time', current_timestamp())
而不是:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
为此,我认为这不会有什么不同,但我没有尝试。
对于entities_df:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.withWatermark('time', '5 seconds')
.groupBy(
'tmp_id',
window('time', '5 seconds')
)
.agg(collect_set('screen_name').alias('tmp_screen_names'))
.withColumn('entities_user_mentions_screen_names', concat_ws(', ', 'tmp_screen_names'))
)
我有一个接收 Twitter 流的 Spark 应用程序。
我添加了时间栏:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
收集我需要的不需要展平的字段:
main_df = (
timestamp_df.selectExpr(['time', 'created_at', 'id',...])
)
我将其他部分展平,将字符串列表转换为字符串:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.withWatermark('time', '10 seconds')
.groupBy(
'tmp_id', window('time', '10 seconds', '5 seconds')
)
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
然后将这 2 个 DataFrame 连接在一起以获得我需要的数据:
final_df = (
main_df
.join(entities_df, main_df.id == entities_df.tmp_id)
.select(['created_at', 'id', ...])
)
当这个 运行 时,我得到空的数据帧。
当我运行静态数据的代码使用这些代码时:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.groupBy('tmp_id')
.agg(collect_set('screen_name').alias('tmp_screen_name'))
.withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)
如果我 运行 以上(没有水印)我得到这个错误:
Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets without watermark
谁能告诉我我做错了什么?
好的,我解决了这个问题。只需对代码进行一些更改:
timestamp_df = tmp_df2.withColumn('time', current_timestamp())
而不是:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
为此,我认为这不会有什么不同,但我没有尝试。
对于entities_df:
entities_df = (
timestamp_df
.select(['time', 'id', explode('entities.user_mentions').alias('temp')])
.selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
.withWatermark('time', '5 seconds')
.groupBy(
'tmp_id',
window('time', '5 seconds')
)
.agg(collect_set('screen_name').alias('tmp_screen_names'))
.withColumn('entities_user_mentions_screen_names', concat_ws(', ', 'tmp_screen_names'))
)