Spark Stream - 'utf8' 编解码器无法解码字节

Spark Stream - 'utf8' codec can't decode bytes

我对流式编程还很陌生。我们有使用 Avro 的 Kafka 流。

我想将 Kafka Stream 连接到 Spark Stream。我使用了下面的代码。

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1]) 

我遇到以下错误。

return s.decode('utf-8') File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode bytes in position 57-58: invalid continuation byte

我是否需要指定 Kafka 使用 Avro,是否出现上述错误?如果是我如何指定它?

对,问题出在流的反序列化上。您可以使用 confluent-kafka-python 库并在 :

中指定 valueDecoder
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`

的解决方案的致谢

是的,你应该指定它。

与java:

流的创建:

final JavaInputDStream<ConsumerRecord<String, avroType>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.Subscribe(topics, kafkaParams));

在 kafka 消费者配置中:

kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
        kafkaParams.put("value.deserializer", SpecificAvroDeserializer.class);