数据流作业随机停止处理数据
Data flow job stops processing data randomly
我制作了一个管道,用于从 pubsub 读取数据并将其流式传输到 BigQuery。在流式传输数据之前,会对其进行预处理以删除重复项。这是通过窗口化到固定 windows 并按 id 对它们进行分组来完成的。管道工作正常,直到几个小时后突然停止。日志中没有任何内容,没有错误消息或任何内容。根据作业指标,数据在 GroupByKey/MergeBuckets 阶段停止。在那之前一切正常,但它不发送任何数据。正如 Metrics from the job 中所见,这似乎是一个硬性中断。
分组步骤的代码如下:
class Transform(PTransform):
def expand(self, pcoll):
return (
pcoll
| "Extract data" >> ParDo(ExtractData())
| "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
| "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
| "Group by dummy key" >> GroupByKey()
| "Remove dummy key" >> MapTuple(lambda _, elem: elem)
)
ExtractData 函数将消息从 json 字符串转换为字典。
我怀疑设置了从 PubSub 读取的消息的时间戳,以便长时间不会触发特定的 Window。请注意,Beam 默认使用事件时间触发器,默认情况下触发器不会提前触发(仅在水印到达 Window 边界时触发)。因此,何时触发 windows 取决于从 PubSub 读取的事件的时间戳。如果您需要以 10 分钟的固定间隔触发数据,请考虑设置 processing time based trigger.
我制作了一个管道,用于从 pubsub 读取数据并将其流式传输到 BigQuery。在流式传输数据之前,会对其进行预处理以删除重复项。这是通过窗口化到固定 windows 并按 id 对它们进行分组来完成的。管道工作正常,直到几个小时后突然停止。日志中没有任何内容,没有错误消息或任何内容。根据作业指标,数据在 GroupByKey/MergeBuckets 阶段停止。在那之前一切正常,但它不发送任何数据。正如 Metrics from the job 中所见,这似乎是一个硬性中断。
分组步骤的代码如下:
class Transform(PTransform):
def expand(self, pcoll):
return (
pcoll
| "Extract data" >> ParDo(ExtractData())
| "Window into Fixed Intervals" >> WindowInto(window.FixedWindows(10))
| "Make dummy key" >> Map(lambda elem: (elem["sensor_id"], elem))
| "Group by dummy key" >> GroupByKey()
| "Remove dummy key" >> MapTuple(lambda _, elem: elem)
)
ExtractData 函数将消息从 json 字符串转换为字典。
我怀疑设置了从 PubSub 读取的消息的时间戳,以便长时间不会触发特定的 Window。请注意,Beam 默认使用事件时间触发器,默认情况下触发器不会提前触发(仅在水印到达 Window 边界时触发)。因此,何时触发 windows 取决于从 PubSub 读取的事件的时间戳。如果您需要以 10 分钟的固定间隔触发数据,请考虑设置 processing time based trigger.