Apache Beam:无法通过 docker-compose 访问 Pub/Sub 模拟器

Apache Beam : cannot access Pub/Sub Emulator via docker-compose

我构建了一个软件,它使用 GCP Pub/Sub 作为消息队列,Apache Beam 构建管道,Flask 构建网络服务器。它在生产中 运行 顺利,但我很难将所有部分与 docker-compose 连接在一起,尤其是 Apache Beam 管道。

我已按照 通过将 SO 答案中的 localhost 替换为我 docker-compose.yaml:

  pubsub_emulator:
    build: docker_images/message_queue
    ports:
      - 8085:8085

  webserver:
    build: docker_images/webserver
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    ports:
      - 8899:8080
    depends_on:
      - pubsub_emulator

   pipeline:
    build: docker_images/pipeline
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    depends_on:
      - pubsub_emulator

网络服务器能够访问 Pub/Sub 模拟器并生成主题。

但是,管道在启动时失败并显示 MalformedURLException:

Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166

管道的选项看起来不错,我定义了它们:

final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); 

BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                                .as(BasePipeline.PipeOptions.class);

options.as(DataflowPipelineOptions.class).setStreaming(true);

options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);

Pipeline pipeline = Pipeline.create(options);

有人知道发生了什么事以及如何解决吗?唯一的解决方案是否意味着将模拟器和管道设置在同一个 docker 中?

您可以尝试将值改成如下:

http://pubsub_emulator:8085

因为缺少 protocol 的错误抱怨在你的情况下应该是 http

根据 Apache Beam SDK 预期的值是完全合格的 URL:

// getPubsubRootUrl
@Default.String(value="https://pubsub.googleapis.com")
 @Hidden
java.lang.String getPubsubRootUrl()
// Root URL for use with the Google Cloud Pub/Sub API.

但是,如果您来自 python 背景,您会注意到 Python SDK which uses gRPC Python as showing in here 只需要由地址和端口组成的服务器地址

# A snippet from google-cloud-python library.
if os.environ.get("PUBSUB_EMULATOR_HOST"):
    kwargs["channel"] = grpc.insecure_channel(
        target=os.environ.get("PUBSUB_EMULATOR_HOST")
    )
grpc.insecure_channel(target, options=None)
Creates an insecure Channel to a server.

The returned Channel is thread-safe.

Parameters: 
target – The server address