我可以将自定义分区程序与 group by 一起使用吗?
Can I use a custom partitioner with group by?
假设我知道我的数据集是不平衡的并且我知道密钥的分布。我想利用它来编写自定义分区程序以充分利用运算符实例。
我知道 DataStream#partitionCustom。但是,如果我的流是键控的,它还能正常工作吗?我的工作看起来像:
KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())
DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()
我想要实现的是:
- 根据某个键有一个流 keyBy,这样 reduce 函数将只用来自那个键的元素被调用。
- group by 根据一些自定义分区将工作拆分到多个节点。
- 自定义分区根据并行运算符实例的数量返回一个数字(这将是固定的,不会重新缩放)。
- 自定义分区从 keyBy 返回不同的值。然而,
keyBy(x) = keyBy(y) => partition(x) = partition(y)
.
- pre-aggregation 在分区前最小化网络流量。
用例示例:
- 数据集:[(0, A), (0, B), (0, C), (1, D), (2, E)]
- 并行运算符实例数:2
- 按功能分组:returns 对的第一个元素
- 分区函数:returns键 0 为 0,键 1 和 2 为 1。优点:处理可能将键 0 和 1 发送到同一运算符实例的数据倾斜,这意味着一个operator 实例将接收 80% 的数据集。
不幸的是,这是不可能的。 DataStreamUtils.reinterpretAsKeyedStream()
要求数据的分区与调用 keyBy()
时相同。
此限制的原因是密钥组以及密钥如何映射到密钥组。 key group 是 Flink 的 keyed state 如何分布的单位。 key groups的个数决定了算子的最大并行度,配置为setMaxParallelism()
。密钥被分配给具有内部散列函数的密钥组。通过更改密钥的分区,同一密钥组的密钥将分布在多台机器上,这将不起作用。
为了调整键分配给机器,您需要更改键分配给键组。但是,没有 public 或可访问的界面可以做到这一点。因此,Flink 1.6 不支持自定义密钥分发。
假设我知道我的数据集是不平衡的并且我知道密钥的分布。我想利用它来编写自定义分区程序以充分利用运算符实例。
我知道 DataStream#partitionCustom。但是,如果我的流是键控的,它还能正常工作吗?我的工作看起来像:
KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())
DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()
我想要实现的是:
- 根据某个键有一个流 keyBy,这样 reduce 函数将只用来自那个键的元素被调用。
- group by 根据一些自定义分区将工作拆分到多个节点。
- 自定义分区根据并行运算符实例的数量返回一个数字(这将是固定的,不会重新缩放)。
- 自定义分区从 keyBy 返回不同的值。然而,
keyBy(x) = keyBy(y) => partition(x) = partition(y)
. - pre-aggregation 在分区前最小化网络流量。
用例示例:
- 数据集:[(0, A), (0, B), (0, C), (1, D), (2, E)]
- 并行运算符实例数:2
- 按功能分组:returns 对的第一个元素
- 分区函数:returns键 0 为 0,键 1 和 2 为 1。优点:处理可能将键 0 和 1 发送到同一运算符实例的数据倾斜,这意味着一个operator 实例将接收 80% 的数据集。
不幸的是,这是不可能的。 DataStreamUtils.reinterpretAsKeyedStream()
要求数据的分区与调用 keyBy()
时相同。
此限制的原因是密钥组以及密钥如何映射到密钥组。 key group 是 Flink 的 keyed state 如何分布的单位。 key groups的个数决定了算子的最大并行度,配置为setMaxParallelism()
。密钥被分配给具有内部散列函数的密钥组。通过更改密钥的分区,同一密钥组的密钥将分布在多台机器上,这将不起作用。
为了调整键分配给机器,您需要更改键分配给键组。但是,没有 public 或可访问的界面可以做到这一点。因此,Flink 1.6 不支持自定义密钥分发。