如何使用 Kafka 为 Azure Eventhub 格式化 pyspark 连接字符串

How to format a pyspark connection string for Azure Eventhub with Kafka

我正在尝试使用 Pyspark 从启用了 Kafka 兼容性的 Azure Eventhub 解析 JSON 消息。我找不到有关如何建立连接的任何文档。

import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

sc.stop() # Jupyter somehow created a context already.. 
sc = SparkContext(appName="PythonTest")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)

# my connection string: 
#Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=examplekeyname;SharedAccessKey=HERETHEJEY=;EntityPath=examplepathname - has a total of 5 partitions

kafkaStream = KafkaUtils.createStream(HOW DO I STRUCTURE THIS??)
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()

查看我的回答(和问题)。那是关于如何在 pyspark 中写入启用 Kafka 的事件中心,但我认为读取配置应该非常相似。棘手的部分是正确设置安全配置。

EH_SASL = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'
// Source: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/spark#running-spark

dfKafka \
.write  \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

您可以找到任何关于如何设置消费者的官方教程here。它适用于 Scala 而不是 PySpark,但如果将其与我的示例进行比较,转换代码相当容易。