如何在小包的流媒体管道中按 N 个元素进行批处理?
How to Batch By N Elements in Streaming Pipeline With Small Bundles?
我已经按照此答案中的描述通过 N 个元素实现了批处理:
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.joda.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class ClickToClicksPack extends DoFn> {
public static final int BATCH_SIZE = 10;
private List accumulator;
@StartBundle
public void startBundle() {
accumulator = new ArrayList(BATCH_SIZE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
accumulator.add(clickEvent);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList(BATCH_SIZE);
}
}
@FinishBundle
public void finishBundle(FinishBundleContext c) {
if (accumulator.size() > 0) {
ClickEvent clickEvent = accumulator.get(0);
long time = clickEvent.getClickTimestamp().getTime();
c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
}
}
}
但是当我 运行 在流模式下进行管道处理时,有很多批次只有 1 或 2 个元素。据我了解,这是因为捆绑包尺寸较小。在 运行ning 一天后,批处理中的平均元素数大约为 4。我真的需要它接近 10,以便更好地执行后续步骤。
有没有办法控制捆绑包的大小?
或者我应该为此目的使用 "GroupIntoBatches" 变换。在这种情况下,我不清楚应该选择什么作为键。
更新:
使用 java 线程 ID 或 VM 主机名作为密钥以应用 "GroupIntoBatches" 转换是个好主意吗?
我最终在内部进行了 "GroupIntoBatches" 的复合转换。
以下答案包含有关密钥选择的建议:
在我当前的实现中,我使用随机键来实现并行性,并且我正在对事件进行窗口化以便定期发出结果,即使一键事件少于 BATCH_SIZE 个。
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.util.Random;
/**
* Batch clicks into packs of BATCH_SIZE size
*/
public class ClickToClicksPack extends PTransform, PCollection>> {
public static final int BATCH_SIZE = 10;
// Define window duration.
// After window's end - elements are emitted even if there are less then BATCH_SIZE elements
public static final int WINDOW_DURATION_SECONDS = 1;
private static final int DEFAULT_SHARDS_NUMBER = 20;
// Determine possible parallelism level
private int shardsNumber = DEFAULT_SHARDS_NUMBER;
public ClickToClicksPack() {
super();
}
public ClickToClicksPack(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Override
public PCollection> expand(PCollection input) {
return input
// assign keys, as "GroupIntoBatches" works only with key-value pairs
.apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS))))
.apply(GroupIntoBatches.ofSize(BATCH_SIZE))
.apply(ParDo.of(new ExtractValues()));
}
/**
* Assigns to clicks random integer between zero and shardsNumber
*/
private static class AssignRandomKeys extends DoFn> {
private int shardsNumber;
private Random random;
AssignRandomKeys(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);
c.output(kv);
}
}
/**
* Extract values from KV
*/
private static class ExtractValues extends DoFn>, Iterable> {
@ProcessElement
public void processElement(ProcessContext c) {
KV> kv = c.element();
c.output(kv.getValue());
}
}
}
我已经按照此答案中的描述通过 N 个元素实现了批处理:
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.joda.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class ClickToClicksPack extends DoFn> {
public static final int BATCH_SIZE = 10;
private List accumulator;
@StartBundle
public void startBundle() {
accumulator = new ArrayList(BATCH_SIZE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
accumulator.add(clickEvent);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList(BATCH_SIZE);
}
}
@FinishBundle
public void finishBundle(FinishBundleContext c) {
if (accumulator.size() > 0) {
ClickEvent clickEvent = accumulator.get(0);
long time = clickEvent.getClickTimestamp().getTime();
c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
}
}
}
但是当我 运行 在流模式下进行管道处理时,有很多批次只有 1 或 2 个元素。据我了解,这是因为捆绑包尺寸较小。在 运行ning 一天后,批处理中的平均元素数大约为 4。我真的需要它接近 10,以便更好地执行后续步骤。
有没有办法控制捆绑包的大小? 或者我应该为此目的使用 "GroupIntoBatches" 变换。在这种情况下,我不清楚应该选择什么作为键。
更新: 使用 java 线程 ID 或 VM 主机名作为密钥以应用 "GroupIntoBatches" 转换是个好主意吗?
我最终在内部进行了 "GroupIntoBatches" 的复合转换。
以下答案包含有关密钥选择的建议:
在我当前的实现中,我使用随机键来实现并行性,并且我正在对事件进行窗口化以便定期发出结果,即使一键事件少于 BATCH_SIZE 个。
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.util.Random;
/**
* Batch clicks into packs of BATCH_SIZE size
*/
public class ClickToClicksPack extends PTransform, PCollection>> {
public static final int BATCH_SIZE = 10;
// Define window duration.
// After window's end - elements are emitted even if there are less then BATCH_SIZE elements
public static final int WINDOW_DURATION_SECONDS = 1;
private static final int DEFAULT_SHARDS_NUMBER = 20;
// Determine possible parallelism level
private int shardsNumber = DEFAULT_SHARDS_NUMBER;
public ClickToClicksPack() {
super();
}
public ClickToClicksPack(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Override
public PCollection> expand(PCollection input) {
return input
// assign keys, as "GroupIntoBatches" works only with key-value pairs
.apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS))))
.apply(GroupIntoBatches.ofSize(BATCH_SIZE))
.apply(ParDo.of(new ExtractValues()));
}
/**
* Assigns to clicks random integer between zero and shardsNumber
*/
private static class AssignRandomKeys extends DoFn> {
private int shardsNumber;
private Random random;
AssignRandomKeys(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);
c.output(kv);
}
}
/**
* Extract values from KV
*/
private static class ExtractValues extends DoFn>, Iterable> {
@ProcessElement
public void processElement(ProcessContext c) {
KV> kv = c.element();
c.output(kv.getValue());
}
}
}