如何正确使用pyspark向kafka broker发送数据?

how to properly use pyspark to send data to kafka broker?

我正在尝试编写一个简单的 pyspark 作业,它将从 kafka broker 主题接收数据,对该数据进行一些转换,并将转换后的数据放在不同的 kafka broker 主题上。

我有以下代码,从kafka主题中读取数据,但是没有效果运行 sendkafka函数:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

为了使我的 sendkafka 函数真正将数据发送到 spark.out kafka 主题,我应该更改什么?

这是正确的代码,它从 Kafka 读取到 Spark,并将 spark 数据写回到不同的 kafka 主题:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

通往运行的方法是:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test