在 DirectPipelineRunner 上使用自定义数据流无限源
Using custom DataFlow unbounded source on DirectPipelineRunner
我正在编写一个从 Kafka 0.8 读取的自定义 DataFlow 无限数据源。我想 运行 在本地使用 DirectPipelineRunner。但是,我得到以下堆栈跟踪:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
这是有道理的,因为我在任何时候都没有为我的自定义源注册评估程序。
阅读 https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只注册了 有界 来源的评估员。为自定义无界源定义和注册评估器的推荐方法是什么?
DirectPipelineRunner
当前仅在有界输入上运行。我们正在积极致力于取消此限制,并希望尽快发布。
与此同时,为了测试目的,您可以使用 withMaxNumRecords
将任何 UnboundedSource
简单地转换为 BoundedSource
,如下例所示:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
有关详细信息,请参阅 this issue on GitHub。
另外,在贡献 Kafka 连接器方面也有一些努力。您可能希望通过 our GitHub repository.
与我们和其他贡献者就此进行交流
我正在编写一个从 Kafka 0.8 读取的自定义 DataFlow 无限数据源。我想 运行 在本地使用 DirectPipelineRunner。但是,我得到以下堆栈跟踪:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
这是有道理的,因为我在任何时候都没有为我的自定义源注册评估程序。
阅读 https://github.com/GoogleCloudPlatform/DataflowJavaSDK,似乎只注册了 有界 来源的评估员。为自定义无界源定义和注册评估器的推荐方法是什么?
DirectPipelineRunner
当前仅在有界输入上运行。我们正在积极致力于取消此限制,并希望尽快发布。
与此同时,为了测试目的,您可以使用 withMaxNumRecords
将任何 UnboundedSource
简单地转换为 BoundedSource
,如下例所示:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
有关详细信息,请参阅 this issue on GitHub。
另外,在贡献 Kafka 连接器方面也有一些努力。您可能希望通过 our GitHub repository.
与我们和其他贡献者就此进行交流