提升groupReduce变换的并行化程度

Enhance the degree of parallelization of groupReduce transformation

在我的 Flink 程序中,我使用 flatMap 操作转换我的数据,该操作将几个数据块分成多个较小的块。这些块有一个 "position" 属性,描述了它们在各自原始块中的位置。现在我使用 groupReduce 需要转换所有共享相同 "position" 属性的小块。所以它应该很容易在多个节点上分发。但是当我 运行 我的程序在多个节点上时 groupReduce 以 1 的 dop 执行。

我猜这是因为我只有一个DataSet,但是Flink Java API 好像没有一个GroupedDataSet。是否有另一种可能性来增强我的 groupReduce 转换的 dop?

这是我正在使用的代码(伪代码忽略 "details"):

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .getDataSet()
    //Until here the dop is correct

DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;

您的代码的问题是 getDataSet() 调用。它returns分组操作的输入。因此,由 slicedTiles 表示的数据集既没有分组也没有对其组进行排序,而是 flatMap 转换的结果并且不考虑 groupBysortGroup 调用在节目中。

对 non-grouped 数据集应用 groupReduce(或 reduce)操作始终是 non-parallel 操作,因为输入数据集的所有元素都作为单组.

从逻辑上讲,三个转换 groupBy().sortGroup().reduceGroup() 属于一起并被转换为单个 groupReduce 运算符(如果 GroupReduceFunction 是可组合的,可能需要一个额外的组合器)。

如果您按如下方式更改实施,它应该会按预期工作。

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .reduceGroup(yourFunction);

我将打开一个 JIRA 问题,将 JavaDocs 添加到 Grouping.getDataSet() 方法以记录此函数的行为。