本地 Pubsub 模拟器不适用于 Dataflow
Local Pubsub Emulator won't work with Dataflow
我正在 Java 中开发 Dataflow,输入来自 Pubsub。后来,我看到了关于如何使用本地 Pubsub 模拟器的指南 here,这样我就不需要部署到 GCP 来进行测试了。
这是我的简单代码:
private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {
@Description("Pub/Sub topic to read messages from")
String getTopic();
void setTopic(String topic);
@Description("Pub/Sub subscription to read messages from")
String getSubscription();
void setSubscription(String subscription);
@Description("Local file output")
String getOutput();
void setOutput(String output);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
options.setStreaming(true);
options.setPubsubRootUrl("localhost:8085");
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// other .apply's
pipeline.run();
}
我能够按照指南进行操作,包括我需要使用示例 Python 代码来创建主题、订阅、发布者甚至发布消息的部分。当我使用 Python 代码与 Pubsub 模拟器交互时,我注意到命令行中的消息 Detected HTTP/2 connection
我 运行 模拟器:
Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
我compiled/run Eclipse中的代码使用Dataflow Pipeline 运行配置,但是我遇到了问题。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription:
...
Caused by: java.lang.RuntimeException: Failed to create subscription:
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost
当我尝试在行 options.setPubsubRootUrl("localhost:8085")
中添加 http
时,我得到一个无限重复的异常:
com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
它似乎到达了 Pubsub 模拟器,但无法作为命令行连接,我 运行 模拟器也无限地生成这个:
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.
如何让我的数据流与 Pubsub 模拟器一起工作?
您正在尝试使用 Beam 2.5 SDK 的数据流分支从 Beam Direct Runner 连接到 Pubsub 模拟器。数据流 2.5 SDK 和 Eclipse 插件已于 2019 年 6 月 6 日弃用。但是这应该有效。
如您所见,您需要在 Beam 中为 PubsubRootUrl 添加前缀 'http://'。您看到的第二个问题表明 localhost:8085
上没有任何内容在侦听。这可能是因为实际上有 2 个本地主机:IPv4 和 IPv6。 Pubsub 模拟器仅侦听 IPv4,Windows 首先尝试 IPv6。尝试将 localhost
替换为 127.0.0.1
以强制使用 IPv4。你应该这样结束:
options.setPubsubRootUrl("http://127.0.0.1:8085")
我正在 Java 中开发 Dataflow,输入来自 Pubsub。后来,我看到了关于如何使用本地 Pubsub 模拟器的指南 here,这样我就不需要部署到 GCP 来进行测试了。
这是我的简单代码:
private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {
@Description("Pub/Sub topic to read messages from")
String getTopic();
void setTopic(String topic);
@Description("Pub/Sub subscription to read messages from")
String getSubscription();
void setSubscription(String subscription);
@Description("Local file output")
String getOutput();
void setOutput(String output);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
options.setStreaming(true);
options.setPubsubRootUrl("localhost:8085");
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// other .apply's
pipeline.run();
}
我能够按照指南进行操作,包括我需要使用示例 Python 代码来创建主题、订阅、发布者甚至发布消息的部分。当我使用 Python 代码与 Pubsub 模拟器交互时,我注意到命令行中的消息 Detected HTTP/2 connection
我 运行 模拟器:
Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
我compiled/run Eclipse中的代码使用Dataflow Pipeline 运行配置,但是我遇到了问题。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription:
...
Caused by: java.lang.RuntimeException: Failed to create subscription:
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost
当我尝试在行 options.setPubsubRootUrl("localhost:8085")
中添加 http
时,我得到一个无限重复的异常:
com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
它似乎到达了 Pubsub 模拟器,但无法作为命令行连接,我 运行 模拟器也无限地生成这个:
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.
如何让我的数据流与 Pubsub 模拟器一起工作?
您正在尝试使用 Beam 2.5 SDK 的数据流分支从 Beam Direct Runner 连接到 Pubsub 模拟器。数据流 2.5 SDK 和 Eclipse 插件已于 2019 年 6 月 6 日弃用。但是这应该有效。
如您所见,您需要在 Beam 中为 PubsubRootUrl 添加前缀 'http://'。您看到的第二个问题表明 localhost:8085
上没有任何内容在侦听。这可能是因为实际上有 2 个本地主机:IPv4 和 IPv6。 Pubsub 模拟器仅侦听 IPv4,Windows 首先尝试 IPv6。尝试将 localhost
替换为 127.0.0.1
以强制使用 IPv4。你应该这样结束:
options.setPubsubRootUrl("http://127.0.0.1:8085")