无法从 Apache Beam 中的本地模拟器读取 Pub/Sub 消息

Unable to read Pub/Sub messages from local emulator in Apache beam

我正在尝试 运行 一个简单的 Apache Beam 管道,其中 DirectRunner 从 Pub/Sub 订阅中读取并将消息写入磁盘。

当我针对 GCP 运行 管道工作正常,但是当我尝试 运行 它针对我的本地 Pub/Sub 模拟器时,它似乎没有做任何事情.

我正在使用扩展 org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions class.

的自定义 Options class
public interface Options extends PubsubOptions {

  @Description("Pub/Sub subscription to read the input from")
  @Required
  ValueProvider<String> getInputSubscription();

  void setInputSubscription(ValueProvider<String> valueProvider);

}

管道很简单

pipeline
        .apply("Read Pub/Sub Messages", PubsubIO.readMessagesWithAttributes()
        .fromSubscription(options.getInputSubscription()))

        .apply("Add a fixed window", Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE))))

        .apply("Convert Pub/Sub To String", new PubSubMessageToString())

        .apply("Write Pub/Sub messages to local disk", new WriteOneFilePerWindow());

管道使用以下选项执行

mvn compile exec:java \
-Dexec.mainClass=DefaultPipeline \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=my-project \
--inputSubscription=projects/my-project/subscriptions/my-subscription \
--pubsubRootUrl=http://127.0.0.1:8681 \
--runner=DirectRunner"

我正在使用 this Pub/Sub 模拟器 docker 图像并使用以下命令执行它:

docker run --rm -ti -p 8681:8681 -e PUBSUB_PROJECT1=my-project,topic:my-subscription marcelcorso/gcloud-pubsub-emulator:latest

是否需要更多配置才能使这项工作正常进行?

您可以通过向本地模拟器发出手动 HTTP 请求(通过 curl)来对本地模拟器进行故障排除,如下所示:

$ curl -d '{"messages": [{"data": "c3Vwc3VwCg=="}]}' -H "Content-Type: application/json" -X POST localhost:8681/v1/projects/my-project/topics/topic:publish
{
  "messageIds": ["5"]
}
$ 

$ curl -d '{"returnImmediately":true, "maxMessages":1}' -H "Content-Type: application/json" -X POST localhost:8681/v1/projects/my-project/subscriptions/my-subscription:pull
{
  "receivedMessages": [{
    "ackId": "projects/my-project/subscriptions/my-subscription:9",
    "message": {
      "data": "c3Vwc3VwCg==",
      "messageId": "5",
      "publishTime": "2019-04-30T17:26:09Z"
    }
  }]
}
$

或者将 gcloud 命令行工具指向它:

$ CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=localhost:8681 gcloud pubsub topics list

另外请注意,当模拟器出现时,它会从头开始创建主题和订阅,因此它们上面没有消息。如果您的管道希望立即在订阅上拉取消息,那就可以解释为什么它看起来“卡住了”。请注意,当您 运行 GCP 的管道时,您在那里使用的主题和订阅可能已经有消息。

事实证明,如果您设置了 GOOGLE_APPLICATION_CREDENTIALS 环境变量,Apache Beam 管道无法从本地 Pub/Sub 模拟器读取数据。

一旦我删除了这个指向 GCP 服务帐户的环境变量,管道就可以与本地 Pub/Sub 模拟器无缝协作。