如何在 flink 中不延迟地发出处理事件的结果
how to emit result of processing an event without delay in flink
我们正在考虑将flink用于一个用例,但不确定flink是否适合它。这是我的用例。当事件 e1 到达时,我们需要对其进行处理并发出结果。源和接收器与本次讨论无关,但您可以将消息队列服务视为源和接收器。事件的整个处理过程独立于其他事件。因此在处理事件 e1 时,我们不需要 e2 或任何其他事件。作为处理的一部分,我们需要执行 step1、step2、step3、step4,如下图所示。注意step2和step3要并行进行
事件的处理延迟对我们来说至关重要。所以我需要在该元素的处理完成后立即发出结果,而不是等待一些 window 超时。由于我对 Flink 的了解有限,我只能想到下面的方法
DataStream<Map<String, Object>> step1 = env.addSource(...);
DataStream<Map<String, Object>> step2 = step1.map(...);
DataStream<Map<String, Object>> step3 = step1.map(...);
现在,我如何合并第 2 步和第 3 步的结果并发出结果?在这个简单的示例中,我只有两个流要合并,但也可以超过 2 个。我可以做一个流的联合。我可以有一个唯一的事件 ID 来对与特定事件相关的中间步骤的输出进行分组。
DataStream<Map<String, Object>> mergedStream = step1.union(step2).keyBy(...);
但是如何发出结果呢?理想情况下,我想说“一旦我从 step2 和 step3 获得特定键的输出就发出结果”而不是“每 30 毫秒发出一次结果”。后者有两个问题:它可能会发出部分结果并且它有延迟。有没有办法指定前者?
我正在探索 Flink,但如果它能解决我的用例,我愿意考虑其他替代方案。
在第 1 步中,添加事件 ID。然后在联合之后,通过事件 ID 键控流并使用 RichFlatMapFunction 将步骤 2 和 3 的结果组合回单个事件。如果第 2 步和第 3 步发出 EnrichedEvent 类型的事件,那么第 4 步可以是:
static class FanIn extends RichFlatMapFunction<EnrichedEvent, EnrichedEvent> {
private transient ValueState<EnrichedEvent> enrichmentResponseState;
@Override
public void flatMap(EnrichedEvent value, Collector<EnrichedEvent> out) throws Exception {
EnrichedEvent response = enrichmentResponseState.value();
if (response != null) {
response = response.combine(value);
} else {
response = value;
}
if (response.isComplete()) {
out.collect(response);
enrichmentResponseState.clear();
} else {
enrichmentResponseState.update(response);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<EnrichedEvent> fanInStateDescriptor =
new ValueStateDescriptor<>( "enrichmentResponse",
TypeInformation.of(new TypeHint<EnrichedEvent>() {})
);
enrichmentResponseState = getRuntimeContext().getState(fanInStateDescriptor);
}
}
之后将合并的最终结果发送到接收器是一件简单的事情。
我们正在考虑将flink用于一个用例,但不确定flink是否适合它。这是我的用例。当事件 e1 到达时,我们需要对其进行处理并发出结果。源和接收器与本次讨论无关,但您可以将消息队列服务视为源和接收器。事件的整个处理过程独立于其他事件。因此在处理事件 e1 时,我们不需要 e2 或任何其他事件。作为处理的一部分,我们需要执行 step1、step2、step3、step4,如下图所示。注意step2和step3要并行进行
事件的处理延迟对我们来说至关重要。所以我需要在该元素的处理完成后立即发出结果,而不是等待一些 window 超时。由于我对 Flink 的了解有限,我只能想到下面的方法
DataStream<Map<String, Object>> step1 = env.addSource(...);
DataStream<Map<String, Object>> step2 = step1.map(...);
DataStream<Map<String, Object>> step3 = step1.map(...);
现在,我如何合并第 2 步和第 3 步的结果并发出结果?在这个简单的示例中,我只有两个流要合并,但也可以超过 2 个。我可以做一个流的联合。我可以有一个唯一的事件 ID 来对与特定事件相关的中间步骤的输出进行分组。
DataStream<Map<String, Object>> mergedStream = step1.union(step2).keyBy(...);
但是如何发出结果呢?理想情况下,我想说“一旦我从 step2 和 step3 获得特定键的输出就发出结果”而不是“每 30 毫秒发出一次结果”。后者有两个问题:它可能会发出部分结果并且它有延迟。有没有办法指定前者? 我正在探索 Flink,但如果它能解决我的用例,我愿意考虑其他替代方案。
在第 1 步中,添加事件 ID。然后在联合之后,通过事件 ID 键控流并使用 RichFlatMapFunction 将步骤 2 和 3 的结果组合回单个事件。如果第 2 步和第 3 步发出 EnrichedEvent 类型的事件,那么第 4 步可以是:
static class FanIn extends RichFlatMapFunction<EnrichedEvent, EnrichedEvent> {
private transient ValueState<EnrichedEvent> enrichmentResponseState;
@Override
public void flatMap(EnrichedEvent value, Collector<EnrichedEvent> out) throws Exception {
EnrichedEvent response = enrichmentResponseState.value();
if (response != null) {
response = response.combine(value);
} else {
response = value;
}
if (response.isComplete()) {
out.collect(response);
enrichmentResponseState.clear();
} else {
enrichmentResponseState.update(response);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<EnrichedEvent> fanInStateDescriptor =
new ValueStateDescriptor<>( "enrichmentResponse",
TypeInformation.of(new TypeHint<EnrichedEvent>() {})
);
enrichmentResponseState = getRuntimeContext().getState(fanInStateDescriptor);
}
}
之后将合并的最终结果发送到接收器是一件简单的事情。