Spark Stream - 'utf8' 编解码器无法解码字节
Spark Stream - 'utf8' codec can't decode bytes
我对流式编程还很陌生。我们有使用 Avro 的 Kafka 流。
我想将 Kafka Stream 连接到 Spark Stream。我使用了下面的代码。
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
我遇到以下错误。
return s.decode('utf-8')
File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode bytes in position 57-58: invalid continuation byte
我是否需要指定 Kafka 使用 Avro,是否出现上述错误?如果是我如何指定它?
对,问题出在流的反序列化上。您可以使用 confluent-kafka-python 库并在 :
中指定 valueDecoder
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`
的解决方案的致谢
是的,你应该指定它。
与java:
流的创建:
final JavaInputDStream<ConsumerRecord<String, avroType>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
在 kafka 消费者配置中:
kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
kafkaParams.put("value.deserializer", SpecificAvroDeserializer.class);
我对流式编程还很陌生。我们有使用 Avro 的 Kafka 流。
我想将 Kafka Stream 连接到 Spark Stream。我使用了下面的代码。
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
我遇到以下错误。
return s.decode('utf-8') File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode bytes in position 57-58: invalid continuation byte
我是否需要指定 Kafka 使用 Avro,是否出现上述错误?如果是我如何指定它?
对,问题出在流的反序列化上。您可以使用 confluent-kafka-python 库并在 :
中指定 valueDecoderfrom confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`
的解决方案的致谢
是的,你应该指定它。
与java:
流的创建:
final JavaInputDStream<ConsumerRecord<String, avroType>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
在 kafka 消费者配置中:
kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
kafkaParams.put("value.deserializer", SpecificAvroDeserializer.class);