Does Google Cloud Dataflow support stateful processing? ERROR: Workflow failed

Does Google Cloud Dataflow support stateful processing? ERROR: Workflow failed

我在流式传输 python Beam 管道中使用状态处理,以便检测 JSON 消息的字段何时更改。当 运行 使用 DirectRunner 时它工作正常,但使用 DataflowRunner 时我得到“工作流失败”。甚至在作业开始之前就出错了。

使用有状态处理的函数:

class DetectChangeFn(DoFn):
    BAG_SPEC: BagStateSpec = BagStateSpec('changes', StrUtf8Coder())

    def __init__(self, field: str, *unused_args, **unused_kwargs):
        super().__init__(*unused_args, **unused_kwargs)
        self.field: str = field

    def process(self, element: Tuple[str, Dict], bag: BagRuntimeState = DoFn.StateParam(BAG_SPEC)) -> Iterable[Dict]:
        prev: List[str] = list(bag.read())
        current: str = str(element[1][self.field])

        if len(prev) == 0:
            bag.add(current)
        elif current != prev[0]:
            bag.clear()
            bag.add(current)
            yield element[1]

我收到的错误日志消息非常不透明,没有太大帮助:

{
 insertId: "ivmrqmc2os"  
 labels: {
  dataflow.googleapis.com/job_id: "2021-06-16_07_32_29-2286715838537053772"   
  dataflow.googleapis.com/job_name: "xxx"   
  dataflow.googleapis.com/log_type: "system"   
  dataflow.googleapis.com/region: "europe-west1"   
 }
 logName: "projects/xxx/logs/dataflow.googleapis.com%2Fjob-message"  
 receiveTimestamp: "2021-06-16T14:32:44.452175552Z"  
 resource: {
  labels: {
   job_id: "2021-06-16_07_32_29-2286715838537053772"    
   job_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west1"    
   step_id: ""    
  }
  type: "dataflow_step"   
 }
 severity: "ERROR"  
 textPayload: "Workflow failed."  
 timestamp: "2021-06-16T14:32:43.507731791Z"  
}

我在 Python 3.8 中使用 Apache Beam 2.30.0。 Google Cloud Dataflow 是否支持有状态处理,还是我遗漏了什么?

(编辑) 添加了模板构建和部署命令:

python -m main \
    --runner DataflowRunner \
    --streaming \
    --save_main_session \
    --setup_file ./setup.py \
    --project $PROJECT \
    --staging_location $STAGING_LOCATION \
    --temp_location $TEMP_LOCATION \
    --template_location "$TEMPLATE"
gcloud dataflow jobs run "my-dataflow-job" \
    --enable-streaming-engine \
    --disable-public-ips \
    --gcs-location "$TEMPLATE" \
    --subnetwork $SUBNET \
    --num-workers $NUM_WORKERS \
    --max-workers $MAX_WORKERS \
    --region $REGION \
    --service-account-email $SERVICE_ACCOUNT

我没有明确指定任何实验,数据流作业中默认使用这些实验:['use_fastavro'、'runner_harness_container_image=gcr.io/cloud-dataflow/v1beta3/harness:2.30.0'、'use_multiple_sdk_containers']

(EDIT 2) 另外,在使用新的 WriteToBigQuery(with_auto_sharding=True) 参数时,我得到了完全相同的错误和行为。

我已经通过阻止 Dataflow 服务对有状态 DoFn 执行融合优化来解决了这个问题。

为此,我在有状态的 DoFn 之前添加了一个 Reshuffle() 步骤。

这是数据流文档中描述的防止融合优化的方法之一:https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#preventing-fusion

我通过查看“工作流程失败”之前的日志发现了这一点。错误发生。有一个“Fusing consumer DoFnName into Stateful DoFn/KV DoFn”(为简单起见进行了编辑)看起来可能是错误的原因。