Does Google Cloud Dataflow support stateful processing? ERROR: Workflow failed
Does Google Cloud Dataflow support stateful processing? ERROR: Workflow failed
我在流式传输 python Beam 管道中使用状态处理,以便检测 JSON 消息的字段何时更改。当 运行 使用 DirectRunner 时它工作正常,但使用 DataflowRunner 时我得到“工作流失败”。甚至在作业开始之前就出错了。
使用有状态处理的函数:
class DetectChangeFn(DoFn):
BAG_SPEC: BagStateSpec = BagStateSpec('changes', StrUtf8Coder())
def __init__(self, field: str, *unused_args, **unused_kwargs):
super().__init__(*unused_args, **unused_kwargs)
self.field: str = field
def process(self, element: Tuple[str, Dict], bag: BagRuntimeState = DoFn.StateParam(BAG_SPEC)) -> Iterable[Dict]:
prev: List[str] = list(bag.read())
current: str = str(element[1][self.field])
if len(prev) == 0:
bag.add(current)
elif current != prev[0]:
bag.clear()
bag.add(current)
yield element[1]
我收到的错误日志消息非常不透明,没有太大帮助:
{
insertId: "ivmrqmc2os"
labels: {
dataflow.googleapis.com/job_id: "2021-06-16_07_32_29-2286715838537053772"
dataflow.googleapis.com/job_name: "xxx"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/xxx/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-06-16T14:32:44.452175552Z"
resource: {
labels: {
job_id: "2021-06-16_07_32_29-2286715838537053772"
job_name: "xxx"
project_id: "xxx"
region: "europe-west1"
step_id: ""
}
type: "dataflow_step"
}
severity: "ERROR"
textPayload: "Workflow failed."
timestamp: "2021-06-16T14:32:43.507731791Z"
}
我在 Python 3.8 中使用 Apache Beam 2.30.0。
Google Cloud Dataflow 是否支持有状态处理,还是我遗漏了什么?
(编辑) 添加了模板构建和部署命令:
python -m main \
--runner DataflowRunner \
--streaming \
--save_main_session \
--setup_file ./setup.py \
--project $PROJECT \
--staging_location $STAGING_LOCATION \
--temp_location $TEMP_LOCATION \
--template_location "$TEMPLATE"
gcloud dataflow jobs run "my-dataflow-job" \
--enable-streaming-engine \
--disable-public-ips \
--gcs-location "$TEMPLATE" \
--subnetwork $SUBNET \
--num-workers $NUM_WORKERS \
--max-workers $MAX_WORKERS \
--region $REGION \
--service-account-email $SERVICE_ACCOUNT
我没有明确指定任何实验,数据流作业中默认使用这些实验:['use_fastavro'、'runner_harness_container_image=gcr.io/cloud-dataflow/v1beta3/harness:2.30.0'、'use_multiple_sdk_containers']
(EDIT 2) 另外,在使用新的 WriteToBigQuery(with_auto_sharding=True) 参数时,我得到了完全相同的错误和行为。
我已经通过阻止 Dataflow 服务对有状态 DoFn 执行融合优化来解决了这个问题。
为此,我在有状态的 DoFn 之前添加了一个 Reshuffle() 步骤。
这是数据流文档中描述的防止融合优化的方法之一:https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion
我通过查看“工作流程失败”之前的日志发现了这一点。错误发生。有一个“Fusing consumer DoFnName into Stateful DoFn/KV DoFn”(为简单起见进行了编辑)看起来可能是错误的原因。
我在流式传输 python Beam 管道中使用状态处理,以便检测 JSON 消息的字段何时更改。当 运行 使用 DirectRunner 时它工作正常,但使用 DataflowRunner 时我得到“工作流失败”。甚至在作业开始之前就出错了。
使用有状态处理的函数:
class DetectChangeFn(DoFn):
BAG_SPEC: BagStateSpec = BagStateSpec('changes', StrUtf8Coder())
def __init__(self, field: str, *unused_args, **unused_kwargs):
super().__init__(*unused_args, **unused_kwargs)
self.field: str = field
def process(self, element: Tuple[str, Dict], bag: BagRuntimeState = DoFn.StateParam(BAG_SPEC)) -> Iterable[Dict]:
prev: List[str] = list(bag.read())
current: str = str(element[1][self.field])
if len(prev) == 0:
bag.add(current)
elif current != prev[0]:
bag.clear()
bag.add(current)
yield element[1]
我收到的错误日志消息非常不透明,没有太大帮助:
{
insertId: "ivmrqmc2os"
labels: {
dataflow.googleapis.com/job_id: "2021-06-16_07_32_29-2286715838537053772"
dataflow.googleapis.com/job_name: "xxx"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/xxx/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-06-16T14:32:44.452175552Z"
resource: {
labels: {
job_id: "2021-06-16_07_32_29-2286715838537053772"
job_name: "xxx"
project_id: "xxx"
region: "europe-west1"
step_id: ""
}
type: "dataflow_step"
}
severity: "ERROR"
textPayload: "Workflow failed."
timestamp: "2021-06-16T14:32:43.507731791Z"
}
我在 Python 3.8 中使用 Apache Beam 2.30.0。 Google Cloud Dataflow 是否支持有状态处理,还是我遗漏了什么?
(编辑) 添加了模板构建和部署命令:
python -m main \
--runner DataflowRunner \
--streaming \
--save_main_session \
--setup_file ./setup.py \
--project $PROJECT \
--staging_location $STAGING_LOCATION \
--temp_location $TEMP_LOCATION \
--template_location "$TEMPLATE"
gcloud dataflow jobs run "my-dataflow-job" \
--enable-streaming-engine \
--disable-public-ips \
--gcs-location "$TEMPLATE" \
--subnetwork $SUBNET \
--num-workers $NUM_WORKERS \
--max-workers $MAX_WORKERS \
--region $REGION \
--service-account-email $SERVICE_ACCOUNT
我没有明确指定任何实验,数据流作业中默认使用这些实验:['use_fastavro'、'runner_harness_container_image=gcr.io/cloud-dataflow/v1beta3/harness:2.30.0'、'use_multiple_sdk_containers']
(EDIT 2) 另外,在使用新的 WriteToBigQuery(with_auto_sharding=True) 参数时,我得到了完全相同的错误和行为。
我已经通过阻止 Dataflow 服务对有状态 DoFn 执行融合优化来解决了这个问题。
为此,我在有状态的 DoFn 之前添加了一个 Reshuffle() 步骤。
这是数据流文档中描述的防止融合优化的方法之一:https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion
我通过查看“工作流程失败”之前的日志发现了这一点。错误发生。有一个“Fusing consumer DoFnName into Stateful DoFn/KV DoFn”(为简单起见进行了编辑)看起来可能是错误的原因。