状态和计时器的问题 - apache beam
Problem with state and timers - apache beam
我正在尝试重新创建 this 博客 post 中描述的 apache beam 管道的简单示例,它使用了状态和计时器。
我编写这段代码是为了尝试测试博客 post 中的内容。代码应该通过将“: enrich”附加到每条记录来简单地丰富字符串记录。在我添加 "stale" 计时器之前,我收到一个错误。
我用的是apache beam 2.13版本,直接运行ner.
这是主要从博客复制粘贴的 Enrich DoFn:
public class Enrich extends DoFn<KV<String, String>, String> {
private static final long serialVersionUID = 1L;
private static final int MAX_BUFFER_SIZE = 2;
@StateId("buffer")
private final StateSpec<BagState<String>> bufferedEvents =
StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState =
StateSpecs.value();
@TimerId("expiry")
private final TimerSpec expirySpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@StateId("buffer") BagState<String> bufferState,
@StateId("count") ValueState<Integer> countState,
@TimerId("expiry") Timer expiryTimer) {
Duration allowedLateness = Duration.standardSeconds(10);
expiryTimer.set(window.maxTimestamp().plus(allowedLateness));
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element().getValue());
if (count >= MAX_BUFFER_SIZE) {
for (String event : bufferState.read()) {
context.output(enrichEvent(event));
}
bufferState.clear();
countState.clear();
}
}
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<String> bufferState) {
if (!bufferState.isEmpty().read()) {
for (String event : bufferState.read()) {
context.output(enrichEvent(event));
}
bufferState.clear();
}
}
public static String enrichEvent(String event) {
return event + ": enriched";
}
public static int firstNonNull(Integer x, Integer y) {
if (x == null) {
return y;
}
return x;
}
}
这是我用来测试 enrich DoFn
:
的代码
@RunWith(JUnit4.class)
public class EnrichTest {
final Logger LOG = LoggerFactory.getLogger(EnrichTest.class);
@Rule
public TestPipeline p = TestPipeline.create();
static final String record1 = "1";
static final String record2 = "2";
static final String record3 = "3";
static final String key = "a key";
static final String result1 = "1: enriched";
static final String result2 = "2: enriched";
static final String result3 = "3: enriched";
@Test
public void testSimple() throws Exception {
Duration ALLOWED_LATENESS = Duration.standardSeconds(10);
Duration WINDOW_DURATION = Duration.standardSeconds(10);
Instant baseTime = new Instant(0L);
KvCoder<String, String> coder =
KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class));
TestStream<KV<String, String>> items =
TestStream
.create(coder)
.advanceWatermarkTo(baseTime)
.addElements(
TimestampedValue.of(
KV.of(key, record1),
baseTime.plus(Duration.standardSeconds(1))))
.addElements(
TimestampedValue.of(
KV.of(key, record2),
baseTime.plus(Duration.standardSeconds(0))))
.advanceWatermarkTo(
baseTime.plus(Duration.standardSeconds(11)))
.addElements(
TimestampedValue.of(
KV.of(key, record3),
baseTime.plus(Duration.standardSeconds(2))))
.advanceWatermarkToInfinity();
PCollection<String> results =
p.apply(items)
.apply(new CreateWindows (WINDOW_DURATION, ALLOWED_LATENESS))
.apply(ParDo.of(new Enrich()));
PAssert
.that(results)
.inWindow(new IntervalWindow(baseTime, WINDOW_DURATION))
.containsInAnyOrder(result1, result2, result3);
p.run().waitUntilFinish();
}
}
这是我的 windowing 函数:
public class CreateWindows extends
PTransform<PCollection<KV<String, String>>,
PCollection<KV<String, String>>> {
private static final long serialVersionUID = 1L;
private final Duration windowDuration;
private final Duration allowedLateness;
public CreateStringWindows(Duration windowDuration, Duration allowedLateness) {
this.windowDuration = windowDuration;
this.allowedLateness = allowedLateness;
}
@Override
public PCollection<KV<String, String>> expand(
PCollection<KV<String, String>> items) {
return items.apply("Aggregate fixed window",
Window.<KV<String, String>>into(FixedWindows.of(windowDuration))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));
}
}
正如我们在上面的代码中看到的,我使用固定的 windows 大小为 10 秒。允许的延迟也设置为 10 秒。
您还应该注意到到期计时器已设置为 expiryTimer.set(window.maxTimestamp().plus(allowedLateness));
,如博客 post 中所述。在我的测试中,我按时添加了前 2 条记录,我将水印移动到 11 秒,然后添加最后一条记录以测试当我添加延迟数据时会发生什么。
当我运行测试时,出现以下错误:
java.lang.IllegalStateException: TimestampCombiner moved element from 1970-01-01T00:00:19.999Z to earlier time 1970-01-01T00:00:09.999Z for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:10.000Z)
我希望这段代码能够处理延迟数据,尤其是因为到期计时器设置为 window.maxTimestamp().plus(allowedLateness)
博客 post 没有具体提到它使用什么 windowing 策略。这可能是问题所在吗?我也尝试过使用 Never.ever()
作为 window 触发器,但我得到了同样的错误:
.triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));
很抱歉 post,任何帮助将不胜感激。
最后,我将 onExpiry
编辑为使用 context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
而不是 context.output(enrichEvent(event));
。这解决了问题。
这是更正后的 onExpiry
方法。
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context, BoundedWindow window,
@StateId("buffer") BagState<String> bufferState) {
if (!bufferState.isEmpty().read()) {
for (String event : bufferState.read()) {
context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
}
bufferState.clear();
}
}
我正在尝试重新创建 this 博客 post 中描述的 apache beam 管道的简单示例,它使用了状态和计时器。
我编写这段代码是为了尝试测试博客 post 中的内容。代码应该通过将“: enrich”附加到每条记录来简单地丰富字符串记录。在我添加 "stale" 计时器之前,我收到一个错误。
我用的是apache beam 2.13版本,直接运行ner.
这是主要从博客复制粘贴的 Enrich DoFn:
public class Enrich extends DoFn<KV<String, String>, String> {
private static final long serialVersionUID = 1L;
private static final int MAX_BUFFER_SIZE = 2;
@StateId("buffer")
private final StateSpec<BagState<String>> bufferedEvents =
StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState =
StateSpecs.value();
@TimerId("expiry")
private final TimerSpec expirySpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(
ProcessContext context,
BoundedWindow window,
@StateId("buffer") BagState<String> bufferState,
@StateId("count") ValueState<Integer> countState,
@TimerId("expiry") Timer expiryTimer) {
Duration allowedLateness = Duration.standardSeconds(10);
expiryTimer.set(window.maxTimestamp().plus(allowedLateness));
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element().getValue());
if (count >= MAX_BUFFER_SIZE) {
for (String event : bufferState.read()) {
context.output(enrichEvent(event));
}
bufferState.clear();
countState.clear();
}
}
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context,
@StateId("buffer") BagState<String> bufferState) {
if (!bufferState.isEmpty().read()) {
for (String event : bufferState.read()) {
context.output(enrichEvent(event));
}
bufferState.clear();
}
}
public static String enrichEvent(String event) {
return event + ": enriched";
}
public static int firstNonNull(Integer x, Integer y) {
if (x == null) {
return y;
}
return x;
}
}
这是我用来测试 enrich DoFn
:
@RunWith(JUnit4.class)
public class EnrichTest {
final Logger LOG = LoggerFactory.getLogger(EnrichTest.class);
@Rule
public TestPipeline p = TestPipeline.create();
static final String record1 = "1";
static final String record2 = "2";
static final String record3 = "3";
static final String key = "a key";
static final String result1 = "1: enriched";
static final String result2 = "2: enriched";
static final String result3 = "3: enriched";
@Test
public void testSimple() throws Exception {
Duration ALLOWED_LATENESS = Duration.standardSeconds(10);
Duration WINDOW_DURATION = Duration.standardSeconds(10);
Instant baseTime = new Instant(0L);
KvCoder<String, String> coder =
KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class));
TestStream<KV<String, String>> items =
TestStream
.create(coder)
.advanceWatermarkTo(baseTime)
.addElements(
TimestampedValue.of(
KV.of(key, record1),
baseTime.plus(Duration.standardSeconds(1))))
.addElements(
TimestampedValue.of(
KV.of(key, record2),
baseTime.plus(Duration.standardSeconds(0))))
.advanceWatermarkTo(
baseTime.plus(Duration.standardSeconds(11)))
.addElements(
TimestampedValue.of(
KV.of(key, record3),
baseTime.plus(Duration.standardSeconds(2))))
.advanceWatermarkToInfinity();
PCollection<String> results =
p.apply(items)
.apply(new CreateWindows (WINDOW_DURATION, ALLOWED_LATENESS))
.apply(ParDo.of(new Enrich()));
PAssert
.that(results)
.inWindow(new IntervalWindow(baseTime, WINDOW_DURATION))
.containsInAnyOrder(result1, result2, result3);
p.run().waitUntilFinish();
}
}
这是我的 windowing 函数:
public class CreateWindows extends
PTransform<PCollection<KV<String, String>>,
PCollection<KV<String, String>>> {
private static final long serialVersionUID = 1L;
private final Duration windowDuration;
private final Duration allowedLateness;
public CreateStringWindows(Duration windowDuration, Duration allowedLateness) {
this.windowDuration = windowDuration;
this.allowedLateness = allowedLateness;
}
@Override
public PCollection<KV<String, String>> expand(
PCollection<KV<String, String>> items) {
return items.apply("Aggregate fixed window",
Window.<KV<String, String>>into(FixedWindows.of(windowDuration))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));
}
}
正如我们在上面的代码中看到的,我使用固定的 windows 大小为 10 秒。允许的延迟也设置为 10 秒。
您还应该注意到到期计时器已设置为 expiryTimer.set(window.maxTimestamp().plus(allowedLateness));
,如博客 post 中所述。在我的测试中,我按时添加了前 2 条记录,我将水印移动到 11 秒,然后添加最后一条记录以测试当我添加延迟数据时会发生什么。
当我运行测试时,出现以下错误:
java.lang.IllegalStateException: TimestampCombiner moved element from 1970-01-01T00:00:19.999Z to earlier time 1970-01-01T00:00:09.999Z for window [1970-01-01T00:00:00.000Z..1970-01-01T00:00:10.000Z)
我希望这段代码能够处理延迟数据,尤其是因为到期计时器设置为 window.maxTimestamp().plus(allowedLateness)
博客 post 没有具体提到它使用什么 windowing 策略。这可能是问题所在吗?我也尝试过使用 Never.ever()
作为 window 触发器,但我得到了同样的错误:
.triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(allowedLateness));
很抱歉 post,任何帮助将不胜感激。
最后,我将 onExpiry
编辑为使用 context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
而不是 context.output(enrichEvent(event));
。这解决了问题。
这是更正后的 onExpiry
方法。
@OnTimer("expiry")
public void onExpiry(
OnTimerContext context, BoundedWindow window,
@StateId("buffer") BagState<String> bufferState) {
if (!bufferState.isEmpty().read()) {
for (String event : bufferState.read()) {
context.outputWithTimestamp(enrichEvent(event), window.maxTimestamp());
}
bufferState.clear();
}
}