FlinkRunner 上的 ApacheBeam 不从 Kafka 读取

ApacheBeam on FlinkRunner doesn't read from Kafka

我正在尝试 运行 由本地 Flink 集群支持的 Apache Beam,以便从 Kafka 主题中使用,如 Documentation for ReadFromKafka.

中所述

代码基本上就是这个管道和 Beam Examples

中描述的一些其他设置
 with beam.Pipeline() as p:

  lines = p | ReadFromKafka(
      consumer_config={'bootstrap.servers': bootstrap_servers},
    topics=[topic],
          ) | beam.WindowInto(beam.window.FixedWindows(1))

  output = lines | beam.FlatMap(lambda x: print(x))

  output | WriteToText(output)

因为我试图在 Flink 上 运行,所以我遵循了这个 doc for Beam on Flink 并做了以下事情:

--> 我下载了 flink 1.10 的二进制文件并遵循了这些 instructions to proper setup the cluster.

我检查了服务器和任务实例的日志。两者都已正确初始化。

--> 使用 docker 启动 kafka 并将其暴露在端口 9092.

--> 在终端中执行了以下命令

python example_1.py --runner FlinkRunner --topic myTopic --bootstrap_servers localhost:9092  --flink_master localhost:8081 --output output_folder

终端输出

2.23.0: Pulling from apache/beam_java_sdk Digest: sha256:3450c7953f8472c2312148a2a8324b0115fd71b3a7a01a3b017f6de69d89dfe1 Status: Image is up to date for apache/beam_java_sdk:2.23.0 docker.io/apache/beam_java_sdk:2.23.0

但是在向 myTopic 写入一些消息后,终端仍处于冻结状态,我在输出文件夹中看不到任何内容。我检查了 flink-conf.yml 并给出了这两行

jobmanager.rpc.address: localhost 
jobmanager.rpc.port: 6123

我假设作业的端口是 6123,而不是 beam 文档中指定的 8081,但两个端口的行为是相同的。

我是 Beam/Flink 的新手,所以我不太确定它是否可以,我现在有两个假设,但不太清楚如何调查它们:

  1. 与 Beam 与 Flink 通信以发送作业的端口相关的内容。

2.The apache.beam.io.external.ReadFromKafka 文档中提到的 Python SDK 的扩展服务

Note: To use these transforms, you need to start a Java Expansion Service. Please refer to the portability documentation on how to do that. Flink Users can use the built-in Expansion Service of the Flink Runner’s Job Server. The expansion service address has to be provided when instantiating the transforms.

但是阅读可移植性文档,它让我回到相同的 doc for Beam on Flink

有人能帮帮我吗?

编辑:我正在使用 Debezium Source Connector for PostgreSQL 撰写主题并看到上述行为。但是当我尝试手动进入主题时,应用程序崩溃并显示以下内容

RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)

您做的一切都正确; Java 扩展服务不再需要手动启动(参见 latest docs)。此外,Flink 在 8081 提供网络服务 UI,但也在那里接受作业提交,因此任一端口都可以正常工作。

您可能 运行 遇到 Python 的 TextIO 没有 yet support streaming 的问题。

此外,当 运行 Python 管道在 Flink 上运行时,实际代码在 docker 映像中运行,因此如果您尝试写入一个“本地”文件它将是图像中的文件,而不是您机器上的文件。