提升groupReduce变换的并行化程度
Enhance the degree of parallelization of groupReduce transformation
在我的 Flink 程序中,我使用 flatMap
操作转换我的数据,该操作将几个数据块分成多个较小的块。这些块有一个 "position" 属性,描述了它们在各自原始块中的位置。现在我使用 groupReduce
需要转换所有共享相同 "position" 属性的小块。所以它应该很容易在多个节点上分发。但是当我 运行 我的程序在多个节点上时 groupReduce
以 1 的 dop 执行。
我猜这是因为我只有一个DataSet
,但是Flink Java API 好像没有一个GroupedDataSet
。是否有另一种可能性来增强我的 groupReduce
转换的 dop?
这是我正在使用的代码(伪代码忽略 "details"):
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.getDataSet()
//Until here the dop is correct
DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;
您的代码的问题是 getDataSet()
调用。它returns分组操作的输入。因此,由 slicedTiles
表示的数据集既没有分组也没有对其组进行排序,而是 flatMap
转换的结果并且不考虑 groupBy
和 sortGroup
调用在节目中。
对 non-grouped 数据集应用 groupReduce
(或 reduce
)操作始终是 non-parallel 操作,因为输入数据集的所有元素都作为单组.
从逻辑上讲,三个转换 groupBy().sortGroup().reduceGroup()
属于一起并被转换为单个 groupReduce
运算符(如果 GroupReduceFunction
是可组合的,可能需要一个额外的组合器)。
如果您按如下方式更改实施,它应该会按预期工作。
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.reduceGroup(yourFunction);
我将打开一个 JIRA 问题,将 JavaDocs 添加到 Grouping.getDataSet()
方法以记录此函数的行为。
在我的 Flink 程序中,我使用 flatMap
操作转换我的数据,该操作将几个数据块分成多个较小的块。这些块有一个 "position" 属性,描述了它们在各自原始块中的位置。现在我使用 groupReduce
需要转换所有共享相同 "position" 属性的小块。所以它应该很容易在多个节点上分发。但是当我 运行 我的程序在多个节点上时 groupReduce
以 1 的 dop 执行。
我猜这是因为我只有一个DataSet
,但是Flink Java API 好像没有一个GroupedDataSet
。是否有另一种可能性来增强我的 groupReduce
转换的 dop?
这是我正在使用的代码(伪代码忽略 "details"):
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.getDataSet()
//Until here the dop is correct
DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;
您的代码的问题是 getDataSet()
调用。它returns分组操作的输入。因此,由 slicedTiles
表示的数据集既没有分组也没有对其组进行排序,而是 flatMap
转换的结果并且不考虑 groupBy
和 sortGroup
调用在节目中。
对 non-grouped 数据集应用 groupReduce
(或 reduce
)操作始终是 non-parallel 操作,因为输入数据集的所有元素都作为单组.
从逻辑上讲,三个转换 groupBy().sortGroup().reduceGroup()
属于一起并被转换为单个 groupReduce
运算符(如果 GroupReduceFunction
是可组合的,可能需要一个额外的组合器)。
如果您按如下方式更改实施,它应该会按预期工作。
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.reduceGroup(yourFunction);
我将打开一个 JIRA 问题,将 JavaDocs 添加到 Grouping.getDataSet()
方法以记录此函数的行为。