Kafka 使用 PySpark 创建 DirectStream

Kafka createDirectStream using PySpark

我的主要目标是连接 Kafka,创建一个 DStream,将其作为行保存到局部变量并将其写入 mongo db,并在 PySpark 中实现端到端流程。

但是我在创建 DStream 的第一步就遇到了问题,错误是 "java.util.ArrayList cannot be cast to java.lang.String"。你能帮我确定修复方法吗? 详情如下,

我正尝试使用 pyspark 连接 kafka,如下所示,

kafkaParams = {"metadata.broker.list": ['host1:port','host2:port','host3:port'],
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}

directKafkaStream = KafkaUtils.createDirectStream(ssc,["lac.mx.digitalchannel.raw.s015-txn-qrdc"],kafkaParams)

但我遇到错误,不知道如何处理,

py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
        at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
        at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

此外,要打开我正在使用的 PySpark CLI,

pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar  --files file.jks,file2.jks

metadata.broker.list 需要是逗号分隔的字符串,而不是列表

main aim is to connect Kafka, create a DStream, save that to the local variable as row and write that into mongo

  1. Mongo 支持结构化流写入

  2. Mongo 还有一个 Kafka Connect 插件,它只需要一个配置文件,不需要 Spark 集群或代码