Apache Beam Go sdk 中的 Combine 函数问题

Issue with Combine function in Apache Beam Go sdk

当 运行 Google Cloud Dataflow 上的管道时,我们遇到了使用 Apache Beam Go SDK (v2.28.0) 进行合并操作的问题。我知道 Go SDK 是实验性的,但如果有人可以帮助我们了解我们的代码是否有任何问题,或者 Go SDK 或数据流中是否存在错误,那就太好了。该问题仅在 运行 具有 Google 数据流的管道以及一些大型数据集时发生。我们正在尝试将 PCollection<pairedVec>

结合起来
type pairedVec struct {
    Vec1 [1048576]uint64
    Vec2 [1048576]uint64
}

PCollection 中有 10,000,000 个项目。

主要功能:

func main() {
    flag.Parse()

    beam.Init()
    ctx := context.Background()
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    records := textio.ReadSdf(scope, *inputFile)
    rRecords := beam.Reshuffle(scope, records)

    vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)
    histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, vecs)

    lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
    textio.Write(scope, *outputFile, lines)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %s", err)
    }
}

读取输入文件后,Dataflow调度1000个worker生成PCollection,并开始做组合。然后工人数量减少到几乎 1 并持续了很长时间。最终作业失败,错误日志如下:

2021-03-02T06:13:40.438112597ZWorkflow failed. Causes: S09:CombinePerKey/CoGBK'1/Read+CombinePerKey/main.combineVecFn+CombinePerKey/main.combineVecFn/Extract+beam.dropKeyFn+main.flattenVecFn+textio.Write/beam.addFixedKeyFn+textio.Write/CoGBK/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: go-job-1-1614659244459204-03012027-u5s6-harness-q8tx Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-44hk Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-05nm Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-l22w Root cause: The worker lost contact with the service.

The change of worker number

编辑

尝试在将所有记录组合在一起之前添加一个步骤以将记录“预组合”到 100,000 个键 (combineDomain=100000):

主要功能:

func main() {
    flag.Parse()

    beam.Init()
    ctx := context.Background()
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    records := textio.ReadSdf(scope, *inputFile)
    rRecords := beam.Reshuffle(scope, records)

    vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)

    keyVecs := beam.ParDo(scope, &addRandomKeyFn{Domain: *combineDomain}, vecs)
    combinedKeyVecs := beam.CombinePerKey(scope, &combineVecFn{LogN: *logN}, keyVecs)
    combinedVecs := beam.DropKey(scope, combinedKeyVecs)

    histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, combinedVecs)

    lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
    textio.Write(scope, *outputFile, lines)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %s", err)
    }
}

但是job只给它调度了一个worker,搞了半天还是失败了:

Workflow failed. Causes: S06:Reshuffle/e6_gbk/Read+Reshuffle/e6_gbk/GroupByWindow+Reshuffle/e6_unreify+main.genVecFn+main.addRandomKeyFn+CombinePerKey/CoGBK'2+CombinePerKey/main.combineVecFn/Partial+CombinePerKey/CoGBK'2/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: 
  go-job-1-1615178257414007-03072037-mrlo-harness-ppjj
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-czng
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-79n8
      Root cause: The worker lost contact with the service.,
  go-job-1-1615178257414007-03072037-mrlo-harness-mj6c
      Root cause: The worker lost contact with the service. 

在 CombinePerKey() 之前添加了另一个 reshuffle 之后,管道安排了 1000 个 worker 来处理它。但是这个工作非常慢,并且使用了大量的 shuffle 数据。 1 小时后,genVecFn 完成了不到 10%,并且有 8.08TB 的洗牌数据。这与我们的生产代码基本一致,最终因为用完了40TB的shuffle数据配额而失败。

我们尝试了另一种方法来减少单个 worker 的工作量:将向量 [1048576]uint64 分割成 32 个 [32768]uint64,然后将每个片段组合起来。类似于:

    totalLength := uint64(1 << *logN)
    segLength := uint64(1 << *segmentBits)
    for i := uint64(0); i < totalLength/segLength; i++ {
        fileName := strings.ReplaceAll(*outputFile, path.Ext(*outputFile), fmt.Sprintf("-%d-%d%s", i+1, totalLength/segLength, path.Ext(*outputFile)))
        pHistogram := beam.Combine(scope, &combineVecRangeFn{StartIndex: i * segLength, Length: segLength}, vecs)
        flattened := beam.ParDo(scope, &flattenVecRangeFn{StartIndex: i * segLength}, pHistogram)
        textio.Write(scope, fileName, flattened)
    }

作业终于成功了。

鉴于您的管道代码,将作业缩减为 1 个工作人员的行为符合 Go SDK 的预期,因为它缺少 Java 和 Python SDK 的一些优化。发生这种情况的原因是因为您使用了 beam.Combine 这是一个全局组合,这意味着 PCollection 中的每个元素都被组合成一个值。在 Go SDK 上,这意味着所有元素都需要本地化到一个 worker 才能组合,对于 1000 万个项目,每个项目大约 16 兆字节,这会花费太长时间并且作业很可能会超时(您可能会确认这一点通过在 Dataflow 日志中查找超时消息)。

其他 SDK 进行了适当的优化,在合并到单个工作人员之前,将输入元素拆分到工作人员之间进行合并。例如在 Java SDK 中:“组合可以并行发生,输入 PCollection 的不同子集被单独组合,并且它们的中间结果进一步组合,以任意树减少模式,直到产生单个结果值。 “

幸运的是,此解决方案很容易为 Go SDK 手动实施。通过在 [0, N) 范围内分配随机键,只需将您的元素放入 N 个桶中(其中 N 大于您理想中想要的工人数量)。然后执行 CombinePerKey and only elements with matching keys need to be localized on a worker, allowing this Combine to be split in multiple workers. Then follow that up with DropKey,然后执行全局组合,您应该会得到预期的结果。