如何使用 spark runner 在 apache beam 中重新洗牌
How to reshuffle in apache beam using spark runner
我正在使用 spark runner 进行此模拟:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
IntStream.range(0, 4_000_000).forEach(outputReceiver::output);
}
}))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
try {
// simulate a rpc call of 10ms
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputReceiver.output(element);
}
}));
PipelineResult result = p.run();
result.waitUntilFinish();
我 运行 使用 --runner=SparkRunner --sparkMaster=local[8]
但在重新洗牌后只使用了 1 个线程。
为什么 Rechuffle 不起作用?
如果我为此更改重新洗牌:
.apply(MapElements.into(kvs(integers(), integers())).via(e -> KV.of(e % 8, e)))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
然后我得到 8 个线程 运行。
BR,拉斐尔。
看起来 Reshuffle on Beam on Spark 归结为
的实现
我想知道在这种情况下 rdd.context().defaultParallelism()
和 rdd.getNumPartitions()
是否都是 1。我已提交 https://issues.apache.org/jira/browse/BEAM-10834 进行调查。
同时,您可以使用 GroupByKey 来获得您所指示的所需并行度。 (如果你没有字面上的整数,你可以尝试使用你的元素的散列,一个 Math.random(),或者甚至是一个递增的计数器作为键)。
我正在使用 spark runner 进行此模拟:
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
IntStream.range(0, 4_000_000).forEach(outputReceiver::output);
}
}))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
try {
// simulate a rpc call of 10ms
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputReceiver.output(element);
}
}));
PipelineResult result = p.run();
result.waitUntilFinish();
我 运行 使用 --runner=SparkRunner --sparkMaster=local[8]
但在重新洗牌后只使用了 1 个线程。
为什么 Rechuffle 不起作用?
如果我为此更改重新洗牌:
.apply(MapElements.into(kvs(integers(), integers())).via(e -> KV.of(e % 8, e)))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())
然后我得到 8 个线程 运行。
BR,拉斐尔。
看起来 Reshuffle on Beam on Spark 归结为
的实现我想知道在这种情况下 rdd.context().defaultParallelism()
和 rdd.getNumPartitions()
是否都是 1。我已提交 https://issues.apache.org/jira/browse/BEAM-10834 进行调查。
同时,您可以使用 GroupByKey 来获得您所指示的所需并行度。 (如果你没有字面上的整数,你可以尝试使用你的元素的散列,一个 Math.random(),或者甚至是一个递增的计数器作为键)。