如何在 flink 中不延迟地发出处理事件的结果

how to emit result of processing an event without delay in flink

我们正在考虑将flink用于一个用例,但不确定flink是否适合它。这是我的用例。当事件 e1 到达时,我们需要对其进行处理并发出结果。源和接收器与本次讨论无关,但您可以将消息队列服务视为源和接收器。事件的整个处理过程独立于其他事件。因此在处理事件 e1 时,我们不需要 e2 或任何其他事件。作为处理的一部分,我们需要执行 step1、step2、step3、step4,如下图所示。注意step2和step3要并行进行

事件的处理延迟对我们来说至关重要。所以我需要在该元素的处理完成后立即发出结果,而不是等待一些 window 超时。由于我对 Flink 的了解有限,我只能想到下面的方法

DataStream<Map<String, Object>> step1 = env.addSource(...);
DataStream<Map<String, Object>> step2 = step1.map(...);
DataStream<Map<String, Object>> step3 = step1.map(...);

现在,我如何合并第 2 步和第 3 步的结果并发出结果?在这个简单的示例中,我只有两个流要合并,但也可以超过 2 个。我可以做一个流的联合。我可以有一个唯一的事件 ID 来对与特定事件相关的中间步骤的输出进行分组。

DataStream<Map<String, Object>> mergedStream = step1.union(step2).keyBy(...);

但是如何发出结果呢?理想情况下,我想说“一旦我从 step2 和 step3 获得特定键的输出就发出结果”而不是“每 30 毫秒发出一次结果”。后者有两个问题:它可能会发出部分结果并且它有延迟。有没有办法指定前者? 我正在探索 Flink,但如果它能解决我的用例,我愿意考虑其他替代方案。

在第 1 步中,添加事件 ID。然后在联合之后,通过事件 ID 键控流并使用 RichFlatMapFunction 将步骤 2 和 3 的结果组合回单个事件。如果第 2 步和第 3 步发出 EnrichedEvent 类型的事件,那么第 4 步可以是:

static class FanIn extends RichFlatMapFunction<EnrichedEvent, EnrichedEvent> {
    private transient ValueState<EnrichedEvent> enrichmentResponseState;

    @Override
    public void flatMap(EnrichedEvent value, Collector<EnrichedEvent> out) throws Exception {
        EnrichedEvent response = enrichmentResponseState.value();

        if (response != null) {
            response = response.combine(value);
        } else {
            response = value;
        }

        if (response.isComplete()) {
            out.collect(response);
            enrichmentResponseState.clear();
        } else {
            enrichmentResponseState.update(response);
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<EnrichedEvent> fanInStateDescriptor =
            new ValueStateDescriptor<>( "enrichmentResponse",
                TypeInformation.of(new TypeHint<EnrichedEvent>() {})
            );

        enrichmentResponseState = getRuntimeContext().getState(fanInStateDescriptor);
    }
}

之后将合并的最终结果发送到接收器是一件简单的事情。