单个 flink 管道的多个 elasticsearch 接收器

Multiple elasticsearch sinks for a single flink pipeline

我的要求是将数据发送到不同的 ES 接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到 sink1,否则将其发送到 sink2 等(基本上是根据数据将其动态发送到任何一个接收器)。我也想分别为ES sink1、ES sink2、Es sink3等设置并行度

                                ->  Es sink1 (parallelism 4)
Kafka -> Map(Transformations)   ->  ES sink2 (parallelism 2)
                                ->  Es sink3 (parallelism 2)

有没有什么简单的方法可以在flink中实现上面的功能?

我的解决方案:(但不满意)

我可以想出一个解决方案,但是我写了一些中间的 kafka 主题(topic1、topic2、topic3),然后有单独的管道用于 Essink1、Essink2 和 ESsink3。我想避免写这些中间的kafka话题。

kafka -> Map(Transformations) -> Kafka topics (Insert into topic1,topic2,topic3 based on the data)

Kafka topic1 -> Essink1(parallelism 4)

Kafka topic2 -> Essink2(parallelism 2)

Kafka topic3 -> Essink3(parallelism 2)

您可以使用带有侧输出 [2] 的 ProcessFunction [1] 以 n 方式拆分流,然后将每个侧输出流连接到适当的接收器。然后在每个接收器上调用 setParallelism() [3]。

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-processfunction
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level