Flink DataStream - 如何从输入元素开始源?
Flink DataStream - how to start a source from an input element?
假设我有一个名为 RequestsSource
的 Flink SourceFunction<String>
。
对于来自该源的每个请求,我想订阅一个外部数据源(为了示例的目的,它可以启动一个单独的线程并开始在该线程上生成数据)。
输出数据可以合并到一个 DataStream
上。例如
Input Requests: A, B
Data produced:
A1
B1
A2
A3
B2
...
...等等,新元素永远添加到数据流中。
如何编写可以执行此操作的 Flink Operator?我可以使用例如FlatMapFunction
?
您通常希望使用 AsyncFunction,它可以(异步地)接受一个输入元素,调用一些外部服务,并发出一组结果。
另见 Apache Flink Training - Async IO。
--肯
听起来您是在询问一种运算符,该运算符可以在接收到订阅事件后基于与外部服务的连接发出一个或多个无限制的数据流。我能看到的唯一干净的方法是在 SourceFunction 或自定义运算符中完成所有工作。
我不相信异步 i/o 可以从单个输入事件发出无限的结果流。 ProcessFunction 可以做到这一点,但只能通过它的 onTimer 方法。
假设我有一个名为 RequestsSource
的 Flink SourceFunction<String>
。
对于来自该源的每个请求,我想订阅一个外部数据源(为了示例的目的,它可以启动一个单独的线程并开始在该线程上生成数据)。
输出数据可以合并到一个 DataStream
上。例如
Input Requests: A, B Data produced: A1 B1 A2 A3 B2 ...
...等等,新元素永远添加到数据流中。
如何编写可以执行此操作的 Flink Operator?我可以使用例如FlatMapFunction
?
您通常希望使用 AsyncFunction,它可以(异步地)接受一个输入元素,调用一些外部服务,并发出一组结果。
另见 Apache Flink Training - Async IO。
--肯
听起来您是在询问一种运算符,该运算符可以在接收到订阅事件后基于与外部服务的连接发出一个或多个无限制的数据流。我能看到的唯一干净的方法是在 SourceFunction 或自定义运算符中完成所有工作。
我不相信异步 i/o 可以从单个输入事件发出无限的结果流。 ProcessFunction 可以做到这一点,但只能通过它的 onTimer 方法。