如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?
How to forward event downstream from a Punctuator instance in a ValueTransformer?
在 KafkaStream 中,当实现 ValueTransformer 或 ValueTransformerWithKey 时,在 transform() 调用,我安排了一个新的标点符号。当执行 Punctuator 的方法 punctuate() 时,我希望它使用上下文实例将事件转发到下游。但是,当作为 DSL 拓扑的一部分时,上下文实例似乎未定义。
关于如何使用 Transformer 执行此操作的任何线索?
在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑。
在 ValueTransformerWithKey 中:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
在我的标点符号中:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public void punctuate(final long timestamp) {
context.forward(key, AlertEvent.builder().withSource(event).build());
context.commit();
}
}
执行时
myStream
.groupByKey(Serialized.with(Serdes.String(), Event.serde()))
.reduce((k, v) -> v)
.transformValues(() -> valueTransformerWithKey)
.toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
我希望标点符号产生的警报事件在过期后转发到 ALARM 主题。
相反,我得到了以下异常:ProcessorContext.forward() 不受支持。
像往常一样,我在javadoc中找到了关于ValueTransformerWithKey接口的答案:
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html
Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.
但是,实现 Transformer 接口反而允许使用 context.forward()。谢谢@Matthias J. Sax
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.
在 KafkaStream 中,当实现 ValueTransformer 或 ValueTransformerWithKey 时,在 transform() 调用,我安排了一个新的标点符号。当执行 Punctuator 的方法 punctuate() 时,我希望它使用上下文实例将事件转发到下游。但是,当作为 DSL 拓扑的一部分时,上下文实例似乎未定义。
关于如何使用 Transformer 执行此操作的任何线索?
在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑。
在 ValueTransformerWithKey 中:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
在我的标点符号中:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public void punctuate(final long timestamp) {
context.forward(key, AlertEvent.builder().withSource(event).build());
context.commit();
}
}
执行时
myStream
.groupByKey(Serialized.with(Serdes.String(), Event.serde()))
.reduce((k, v) -> v)
.transformValues(() -> valueTransformerWithKey)
.toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
我希望标点符号产生的警报事件在过期后转发到 ALARM 主题。
相反,我得到了以下异常:ProcessorContext.forward() 不受支持。
像往常一样,我在javadoc中找到了关于ValueTransformerWithKey接口的答案: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html
Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.
但是,实现 Transformer 接口反而允许使用 context.forward()。谢谢@Matthias J. Sax
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.