'utf8' 编解码器无法解码位置 0 中的字节 0xff:无效的起始字节 pySpark Kafka

'utf8' codec can't decode byte 0xff in position 0: invalid start byte pySpark Kafka

kafkaStream = KafkaUtils.createStream(
    ssc,
    'zookeeperserver1.sys.net:2181,zookeeperserver2.sys.net:2181,zookeeperserver3.sys.net:2181,zookeeperserver4.sys.net:2181,zookeeperserver5.sys.net:2181,zookeeperserver6.sys.net:2181',
    'spark-streaming23',
    {'topic-name':3})

lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()

File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0xff in position 0: invalid start byte

看起来键或值不是 UTF-8 编码的。 createStreamcreateDirectStream 都有两个额外的参数:

  • keyDecoder – A function used to decode key (default is utf8_decoder)
  • valueDecoder – A function used to decode value (default is utf8_decoder)

如您所见,两者都默认为 utf8_decoder。如果

  • 您知道一个或两个都不是有效的 UTF-8 字符串,您可以提供自己的解码器或仅使用标识函数来获取原始输入:

    KafkaUtils.createStream(
        ssc, ..., keyDecoder=lambda x: x, valueDecoder=lambda x: x
    )
    
  • 如果您怀疑问题出在某些格式错误的条目中,您可以将现有的 decoder 替换为处理解码异常的条目。这些行周围的东西应该可以解决问题:

    from pyspark.streaming.kafka import utf8_decoder
    
    def safe_utf8_decode(s):
        try:
            return utf8_decoder(s)
        except UnicodeDecodeError:
            pass 
    

也就是说,除非您正在寻找更高级的应用程序,其中 DStreams 仍然是不可替代的,否则我宁愿推荐 Structured Streaming