ReadFromKafka throws ValueError: Unsupported signal: 2
ReadFromKafka throws ValueError: Unsupported signal: 2
目前我正在尝试将 apache beam 与 apache kafka 结合起来。
Kafka 服务 运行ning(本地),我用 kafka-console-producer 写了一些测试消息。
首先,我写了这个 Java 代码片段来用我知道的语言测试 apache beam。它按预期工作。
public class Main {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
Read<Long, String> kafkaReader = KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("beam-test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class);
kafkaReader.withoutMetadata();
pipeline
.apply("Kafka", kafkaReader
).apply(
"Extract words", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c){
System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
}
})
);
pipeline.run();
}
}
我的目标是在 python 中写同样的内容,这就是我目前的状态:
def run_pipe():
with beam.Pipeline(options=PipelineOptions()) as p:
(p
| 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'}, topics=['beam-test'])
| 'Test Print' >> beam.Map(print)
)
if __name__ == '__main__':
run_pipe()
现在进入正题。当我尝试 运行 python 代码时,出现以下错误:
(app) λ python ArghKafkaExample.py
Traceback (most recent call last):
File "ArghKafkaExample.py", line 22, in <module>
run_pipe()
File "ArghKafkaExample.py", line 10, in run_pipe
(p
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 1028, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 572, in __ror__
result = p.apply(self, pvalueish, label)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 648, in apply
return self.apply(transform, pvalueish)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 691, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 198, in apply
return m(transform, input, options)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 228, in apply_PTransform
return transform.expand(input)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 322, in expand
self._expanded_components = self._resolve_artifacts(
File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 120, in __exit__
next(self.gen)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 372, in _service
yield stub
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 523, in __exit__
self._service_provider.__exit__(*args)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 74, in __exit__
self.stop()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 133, in stop
self.stop_process()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 179, in stop_process
return super(JavaJarServer, self).stop_process()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 143, in stop_process
self._process.send_signal(signal.SIGINT)
File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py", line 1434, in send_signal
raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2
通过谷歌搜索我发现,它与程序退出代码(如 Strg+C)有关,但总的来说我完全不知道问题是什么。
任何建议都会有所帮助!
你好帕斯卡
您的管道代码在这里似乎是正确的。问题是由于 Python SDK 中 Kafka IO 的要求。来自 module documentation:
These transforms are currently supported by Beam portable runners (for example, portable Flink and Spark) as well as Dataflow runner.
Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline.
Kafka IO 在 Python 中作为跨语言转换在 Java 中实现,您的管道失败是因为您没有设置执行跨语言转换的环境。通俗地解释一下什么是跨语言转换:Kafka转换实际上是在Java SDK而不是Python SDK上执行的,所以它可以利用现有的Kafka代码在 Java.
有两个障碍阻止您的管道工作。更容易修复的是只有我上面引用的 运行ners 支持跨语言转换,所以如果你 运行ning 这个管道与 Direct 运行ner 它不会工作,您需要在本地模式下切换到 Flink 或 Spark 运行ner。
更棘手的障碍是您需要启动扩展服务才能将外部转换添加到您的管道。您得到的堆栈跟踪正在发生,因为 Beam 正在尝试扩展转换但无法连接到扩展服务,并且扩展失败。
如果尽管进行了额外的设置,您仍想尝试 运行使用跨语言进行此操作,我链接的文档包含有关 运行 扩展服务的说明。在我写这个答案的时候,这个功能还是新的,文档中可能存在盲点。如果您 运行 遇到问题,我鼓励您在 Apache Beam users mailing list or Apache Beam slack channel.
上提问
目前我正在尝试将 apache beam 与 apache kafka 结合起来。
Kafka 服务 运行ning(本地),我用 kafka-console-producer 写了一些测试消息。
首先,我写了这个 Java 代码片段来用我知道的语言测试 apache beam。它按预期工作。
public class Main {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
Read<Long, String> kafkaReader = KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("beam-test")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class);
kafkaReader.withoutMetadata();
pipeline
.apply("Kafka", kafkaReader
).apply(
"Extract words", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c){
System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
}
})
);
pipeline.run();
}
}
我的目标是在 python 中写同样的内容,这就是我目前的状态:
def run_pipe():
with beam.Pipeline(options=PipelineOptions()) as p:
(p
| 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'}, topics=['beam-test'])
| 'Test Print' >> beam.Map(print)
)
if __name__ == '__main__':
run_pipe()
现在进入正题。当我尝试 运行 python 代码时,出现以下错误:
(app) λ python ArghKafkaExample.py
Traceback (most recent call last):
File "ArghKafkaExample.py", line 22, in <module>
run_pipe()
File "ArghKafkaExample.py", line 10, in run_pipe
(p
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 1028, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 572, in __ror__
result = p.apply(self, pvalueish, label)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 648, in apply
return self.apply(transform, pvalueish)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 691, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 198, in apply
return m(transform, input, options)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 228, in apply_PTransform
return transform.expand(input)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 322, in expand
self._expanded_components = self._resolve_artifacts(
File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 120, in __exit__
next(self.gen)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 372, in _service
yield stub
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 523, in __exit__
self._service_provider.__exit__(*args)
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 74, in __exit__
self.stop()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 133, in stop
self.stop_process()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 179, in stop_process
return super(JavaJarServer, self).stop_process()
File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 143, in stop_process
self._process.send_signal(signal.SIGINT)
File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py", line 1434, in send_signal
raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2
通过谷歌搜索我发现,它与程序退出代码(如 Strg+C)有关,但总的来说我完全不知道问题是什么。
任何建议都会有所帮助!
你好帕斯卡
您的管道代码在这里似乎是正确的。问题是由于 Python SDK 中 Kafka IO 的要求。来自 module documentation:
These transforms are currently supported by Beam portable runners (for example, portable Flink and Spark) as well as Dataflow runner.
Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline.
Kafka IO 在 Python 中作为跨语言转换在 Java 中实现,您的管道失败是因为您没有设置执行跨语言转换的环境。通俗地解释一下什么是跨语言转换:Kafka转换实际上是在Java SDK而不是Python SDK上执行的,所以它可以利用现有的Kafka代码在 Java.
有两个障碍阻止您的管道工作。更容易修复的是只有我上面引用的 运行ners 支持跨语言转换,所以如果你 运行ning 这个管道与 Direct 运行ner 它不会工作,您需要在本地模式下切换到 Flink 或 Spark 运行ner。
更棘手的障碍是您需要启动扩展服务才能将外部转换添加到您的管道。您得到的堆栈跟踪正在发生,因为 Beam 正在尝试扩展转换但无法连接到扩展服务,并且扩展失败。
如果尽管进行了额外的设置,您仍想尝试 运行使用跨语言进行此操作,我链接的文档包含有关 运行 扩展服务的说明。在我写这个答案的时候,这个功能还是新的,文档中可能存在盲点。如果您 运行 遇到问题,我鼓励您在 Apache Beam users mailing list or Apache Beam slack channel.
上提问