数据流批处理作业卡在 GroupByKey.create()

Dataflow Batch Job Stuck in GroupByKey.create()

我有一个批处理数据流管道,我有 运行 在我们的数据子集上多次没有问题,大约有 150k 行输入。我现在已经尝试 运行 在我们大约 3 亿行的完整数据集上。管道的一个关键部分执行输入记录的 GroupByKey,导致(我相信)~100M 键。

管道的相关部分如下所示:

// Results in map of ~10k elements, 160MB in size
PCollectionView<Map<String, Iterable<Z>>> sideData = ...

...

.apply(ParDo.named("Group into KV").of(
    new DoFn<T, KV<String, T>>() { ... }
))
.apply("GBK", GroupByKey.create())
.apply(ParDo.named("Merge Values").withSideInputs(sideData).of(
    new DoFn<KV<String, Iterable<T>>, V>() { ... }
))

我有两次 运行 这个管道,每次工作都在 运行 正常超过 16 小时后停止。我第一次 运行 它使用 10 n1-highmem-8 第二次使用 6 n1-highmem-16 个实例。

我可以从数据流作业控制台得知,Group into KV ParDo 完成得很好,输出了 101,730,100 个元素,大小为 153.67 GB。 GBK t运行sform 的步骤详细信息表明在第一次和第二次尝试中分别添加了 72,091,846 和 72,495,353 个元素。此时 GBK t运行sform 仍处于 运行ning 阶段,但所有机器上的 CPU 都降为零,管道实际上已停滞。管道中的所有未来阶段都停止增加元素计数。我可以通过 ssh 进入机器查看 /var/log/dataflow/ 下的各种日志,但似乎没有任何异常。云控制台中没有错误,GC 日志似乎也没有指示内存问题。

此时我有点不知所措,不知道下一步该做什么。我读过使用 Combiner 而不是使用 GroupByKey 可以产生更好的可伸缩性。通过一些重构,我可以使代码是可交换的,这样 Combiner 就成为一个选项。我有点犹豫要不要尝试这样做,因为每次我尝试 运行 这个管道时,我都会浪费大约 250 美元的云计算时间。

我的问题是:

上述作业的 IDS:

看起来一名工作人员可能被卡住了或花了很长时间才能 运行 DoFn 代码在 GroupByKey 之后。最可能的原因是 "hot key"(具有比其他键多得多的值)。您可以向 DoFn 添加聚合器并在 运行ning 时报告 Iterable 的大小,如下所示:

private static class MyDoFn extends KV<String, Iterable<T>>, V> {

  private static final Logger LOG =
    LoggerFactory.getLogger(FilterTextFn.class);
  private final Aggregator<Long, Long> maxIterableSize =
      createAggregator("maxIterableSize", new Max.MaxLongFn());

  @Override
  public void processElement(ProcessContext c) {
    long numElements = 0;
    for (T value : c.element().getValue()) {
      maxIterableSize.addValue(numElements++);
      if (numElements == 100) {
        LOG.warning("Key {} has > 100 values", c.element().getKey());
      }
      ... // the rest of your code inside the loop
    }
  }
}

上面将添加一个计数器,显示单个键上元素的最大数量,并向 Cloud Logging 报告任何值超过 100 个的键(随意调整似乎合理的阈值——单个热键可能比任何其他键具有 许多 个元素。

另一种可能性是 DoFn 的代码中某些特定数据集挂起或非常慢。您可以尝试连接到正在处理这一项目的工作人员并查看它正在处理什么(使用您提到的 kill -QUIT <pid>)。