'utf8' 编解码器无法解码位置 0 中的字节 0xff:无效的起始字节 pySpark Kafka
'utf8' codec can't decode byte 0xff in position 0: invalid start byte pySpark Kafka
kafkaStream = KafkaUtils.createStream(
ssc,
'zookeeperserver1.sys.net:2181,zookeeperserver2.sys.net:2181,zookeeperserver3.sys.net:2181,zookeeperserver4.sys.net:2181,zookeeperserver5.sys.net:2181,zookeeperserver6.sys.net:2181',
'spark-streaming23',
{'topic-name':3})
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0xff in position 0: invalid start byte
看起来键或值不是 UTF-8 编码的。 createStream
和 createDirectStream
都有两个额外的参数:
- keyDecoder – A function used to decode key (default is utf8_decoder)
- valueDecoder – A function used to decode value (default is utf8_decoder)
如您所见,两者都默认为 utf8_decoder
。如果
您知道一个或两个都不是有效的 UTF-8 字符串,您可以提供自己的解码器或仅使用标识函数来获取原始输入:
KafkaUtils.createStream(
ssc, ..., keyDecoder=lambda x: x, valueDecoder=lambda x: x
)
如果您怀疑问题出在某些格式错误的条目中,您可以将现有的 decoder
替换为处理解码异常的条目。这些行周围的东西应该可以解决问题:
from pyspark.streaming.kafka import utf8_decoder
def safe_utf8_decode(s):
try:
return utf8_decoder(s)
except UnicodeDecodeError:
pass
也就是说,除非您正在寻找更高级的应用程序,其中 DStreams
仍然是不可替代的,否则我宁愿推荐 Structured Streaming。
kafkaStream = KafkaUtils.createStream(
ssc,
'zookeeperserver1.sys.net:2181,zookeeperserver2.sys.net:2181,zookeeperserver3.sys.net:2181,zookeeperserver4.sys.net:2181,zookeeperserver5.sys.net:2181,zookeeperserver6.sys.net:2181',
'spark-streaming23',
{'topic-name':3})
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
File "/usr/lib64/python2.7/encodings/utf_8.py", line 16, in decode return codecs.utf_8_decode(input, errors, True) UnicodeDecodeError: 'utf8' codec can't decode byte 0xff in position 0: invalid start byte
看起来键或值不是 UTF-8 编码的。 createStream
和 createDirectStream
都有两个额外的参数:
- keyDecoder – A function used to decode key (default is utf8_decoder)
- valueDecoder – A function used to decode value (default is utf8_decoder)
如您所见,两者都默认为 utf8_decoder
。如果
您知道一个或两个都不是有效的 UTF-8 字符串,您可以提供自己的解码器或仅使用标识函数来获取原始输入:
KafkaUtils.createStream( ssc, ..., keyDecoder=lambda x: x, valueDecoder=lambda x: x )
如果您怀疑问题出在某些格式错误的条目中,您可以将现有的
decoder
替换为处理解码异常的条目。这些行周围的东西应该可以解决问题:from pyspark.streaming.kafka import utf8_decoder def safe_utf8_decode(s): try: return utf8_decoder(s) except UnicodeDecodeError: pass
也就是说,除非您正在寻找更高级的应用程序,其中 DStreams
仍然是不可替代的,否则我宁愿推荐 Structured Streaming。