可以在 Apache Apex 的 DAG 中间使用输入运算符吗

Can an Input Operator be used in the middle of a DAG in Apache Apex

Apex的所有例子都说DAG的第一个算子应该是输入算子。这个运算符可以出现在 DAG 中间的某个地方吗?

考虑这样一种情况,我要从数据库中获取数据,基于之前的运算符刚刚处理过的一些数据,这意味着输入运算符将出现在 DAG 中间的某个地方.

根据输入算子的定义,它是一种没有任何输入流的算子。但如果使用连接器,它也会执行获取数据的工作。那么,如果我在 DAG 之间的某处获取数据,它会起作用吗?

不,不能在 DAG 之间使用输入运算符。 正如您已经指出的那样,由于没有输入流,您将无法从先前的运算符获取数据以用于此运算符。

对于您指出的示例,最好使用输入流编写您自己的通用运算符,该输入流实际上具有与输入运算符类似的功能,其中它可以根据输入流中的数据从外部源读取数据输入流。

此外,请注意一点: 如果查询量太大,最好有一个异步线程来查询数据库。该线程可以将数据写入队列,主线程可以从队列中读取记录并将它们发送到输出流。这将确保主运算符线程不被阻塞并且运算符不会失败。

这是一个有趣的用例。您应该能够扩展输入运算符(比如 JdbcInputOperator,因为您想从数据库中读取)并向其添加输入端口。此输入端口从您的 DAG 中的另一个运算符接收数据(元组)并更新 JdbcInputOperator 的 "where" 子句,以便它基于此读取数据。希望这就是您要找的。

是的,这是可能的。您可以扩展现有的 InputOperator 并向其添加 InputPort(s)。在这种情况下,Apex 平台会将您的运营商作为通用运营商来处理,而不是调用 InputOperator.emitTuples()。调用 super.emitTuples() 或直接在输出端口上发射将是您的扩展操作员责任。