结构化流媒体 - 从同一流媒体源加入 2 个数据帧

Structured Streaming - Joining 2 DataFrames from the same Streaming source

我有一个接收 Twitter 流的 Spark 应用程序。

我添加了时间栏:

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

收集我需要的不需要展平的字段:

main_df = (
    timestamp_df.selectExpr(['time', 'created_at', 'id',...])
)

我将其他部分展平,将字符串列表转换为字符串:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .withWatermark('time', '10 seconds')
    .groupBy(
        'tmp_id', window('time', '10 seconds', '5 seconds')
    )
    .agg(collect_set('screen_name').alias('tmp_screen_name'))
    .withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)

然后将这 2 个 DataFrame 连接在一起以获得我需要的数据:

final_df = (
    main_df
    .join(entities_df, main_df.id == entities_df.tmp_id)
    .select(['created_at', 'id', ...])
)

当这个 运行 时,我得到空的数据帧。

当我运行静态数据的代码使用这些代码时:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .groupBy('tmp_id')
    .agg(collect_set('screen_name').alias('tmp_screen_name'))
    .withColumn('entities_user_mentions_screen_name', concat_ws(', ', 'tmp_screen_name'))
)

如果我 运行 以上(没有水印)我得到这个错误:

Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets without watermark

谁能告诉我我做错了什么?

好的,我解决了这个问题。只需对代码进行一些更改:

timestamp_df = tmp_df2.withColumn('time', current_timestamp())

而不是:

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp_df = tmp_df2.withColumn('time', unix_timestamp(lit(timestamp), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

为此,我认为这不会有什么不同,但我没有尝试。

对于entities_df:

entities_df = (
    timestamp_df
    .select(['time', 'id', explode('entities.user_mentions').alias('temp')])
    .selectExpr(['time', 'id AS tmp_id', 'temp.screen_name'])
    .withWatermark('time', '5 seconds')
    .groupBy(
        'tmp_id',
        window('time', '5 seconds')
    )
    .agg(collect_set('screen_name').alias('tmp_screen_names'))
    .withColumn('entities_user_mentions_screen_names', concat_ws(', ', 'tmp_screen_names'))
)