如何在推送到 kafka 主题时将此行形式转换为 JSON

How can I convert this row form into JSON while pushing into kafka topic

我正在使用 Spark 应用程序处理放置在我系统中 /home/user1/files/ 文件夹中的文本文件,并将这些文本文件中存在的逗号分隔数据映射到特定的 JSON 格式。我已经使用 spark 编写了以下 python 代码来执行相同的操作。但是 Kafka 中的输出将如下所示

Row(Name=Priyesh,Age=26,MailId=priyeshkaratha@gmail.com,Address=AddressTest,Phone=112)

Python 代码:

import findspark
findspark.init('/home/user1/spark')
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.sql import Column, DataFrame, Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='server.kafka:9092')

def handler(message):
 records = message.collect()
 for record in records:
  producer.send('spark.out', str(record))
  print(record)
  producer.flush()

def main():
 sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
 ssc = StreamingContext(sc, 1)

 lines = ssc.textFileStream('/home/user1/files/')
 fields = lines.map(lambda l: l.split(",")) 
 udr =  fields.map(lambda p: Row(Name=p[0],Age=int(p[3].split('@')[0]),MailId=p[31],Address=p[29],Phone=p[46]))
 udr.foreachRDD(handler)

 ssc.start()
 ssc.awaitTermination()
if __name__ == "__main__":
 main()

那么如何在推送到 kafka 主题时将此行格式转换为 JSON?

您可以将 Spark Row 对象转换为字典对象,然后将它们序列化为 JSON。例如,您可以更改此行:

producer.send('spark.out', str(record))

对此:

producer.send('spark.out', json.dumps(record.asDict())))

或者..在你的示例代码中,因为你没有使用数据帧,你可以将它创建为一个字典而不是一个行。