KafkaRecord 不能转换为 [B
KafkaRecord cannot be cast to [B
我正在尝试使用 Python SDK 为 Apache Beam 和 Flink 运行ner 处理来自 Apache Kafka 的数据流。在 运行 宁 Kafka 2.4.0 和 Flink 1.8.3 之后,我按照以下步骤操作:
1) 使用 Flink 1.8 运行ner.运行 编译和 运行 Beam 2.16。
git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
cd beam-2.16.0
nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
2) 运行 Python 管道。
from apache_beam import Pipeline
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
if __name__ == '__main__':
with Pipeline(options=PipelineOptions([
'--runner=FlinkRunner',
'--flink_version=1.8',
'--flink_master_url=localhost:8081',
'--environment_type=LOOPBACK',
'--streaming'
])) as pipeline:
(
pipeline
| 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test']) # [BEAM-3788] ???
)
result = pipeline.run()
result.wait_until_finish()
3) 发布一些数据到Kafka。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>{"hello":"world!"}
Python 脚本抛出此错误:
[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 13 more
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
... 1 more
ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
Traceback (most recent call last):
File "main.py", line 40, in <module>
run()
File "main.py", line 37, in run
result.wait_until_finish()
File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
我试过了other deserializers available in Kafka but they did not work: Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer
. This error is originating from this piece of code。
我是不是做错了什么?
免责声明:这是我第一次接触 Apache Beam 项目。
根据这个 JIRA. Apparently, it seems that there is still problem with FlinkRunner
combined with this new API. Even though your code is technically correct it will not run correctly on Flink. There is a patch available,Kafka 消费者支持似乎在 Beam 中(至少在 Python 界面中)是很新鲜的东西,这对我来说更像是一个快速修复而不是最终解决方案。它需要重新编译,因此我不建议在生产中使用它。如果您是技术新手,不想被封杀,不妨试试看。
我正在尝试使用 Python SDK 为 Apache Beam 和 Flink 运行ner 处理来自 Apache Kafka 的数据流。在 运行 宁 Kafka 2.4.0 和 Flink 1.8.3 之后,我按照以下步骤操作:
1) 使用 Flink 1.8 运行ner.运行 编译和 运行 Beam 2.16。
git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
cd beam-2.16.0
nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &
2) 运行 Python 管道。
from apache_beam import Pipeline
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
if __name__ == '__main__':
with Pipeline(options=PipelineOptions([
'--runner=FlinkRunner',
'--flink_version=1.8',
'--flink_master_url=localhost:8081',
'--environment_type=LOOPBACK',
'--streaming'
])) as pipeline:
(
pipeline
| 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test']) # [BEAM-3788] ???
)
result = pipeline.run()
result.wait_until_finish()
3) 发布一些数据到Kafka。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>{"hello":"world!"}
Python 脚本抛出此错误:
[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 13 more
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
... 1 more
ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
Traceback (most recent call last):
File "main.py", line 40, in <module>
run()
File "main.py", line 37, in run
result.wait_until_finish()
File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
我试过了other deserializers available in Kafka but they did not work: Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer
. This error is originating from this piece of code。
我是不是做错了什么?
免责声明:这是我第一次接触 Apache Beam 项目。
根据这个 JIRA. Apparently, it seems that there is still problem with FlinkRunner
combined with this new API. Even though your code is technically correct it will not run correctly on Flink. There is a patch available,Kafka 消费者支持似乎在 Beam 中(至少在 Python 界面中)是很新鲜的东西,这对我来说更像是一个快速修复而不是最终解决方案。它需要重新编译,因此我不建议在生产中使用它。如果您是技术新手,不想被封杀,不妨试试看。