Docker pypspark 集群容器没有从主机接收 kafka 流?
Docker pypspark cluster container not receiving kafka streaming from the host?
我已经创建并部署了一个由 4 个容器组成的 spark 集群 运行
火花大师
火花工人
火花提交
data-mount-container : 从本地目录访问脚本
我在所有这些容器中添加了所需的依赖 jar
并且还在主机中部署了 kafka,它通过生产者生成流。
我按照以下文档中的确切步骤启动了 kafka
https://kafka.apache.org/quickstart
我验证了 kafka 生产者和消费者在 9092 端口上交换消息,工作正常
下面是我想作为结构化流处理的简单 pyspark 脚本
from pyspark import SparkContext
from pyspark.sql import SparkSession
print("Kafka App launched")
spark = SparkSession.builder.master("spark://master:7077").appName("kafka_Structured").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hostmachine:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).load()
converted_string=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("Recieved Stream in String", converted_string)
下面是我用来执行脚本的 spark-submit
##container
# pyspark_vol - container for vol mounting
# spark/stru_kafka - container for spark-submit
# i linked the spark master and worker already using the container 'master'
##spark submit
docker run --add-host="localhost: myhost" --rm -it --link master:master --volumes-from pyspark_vol spark/stru_kafka spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 –jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-sql-kafka-0-10_2.11-2.1.1.jar --master spark://master:7077 /data/spark_session_kafka.py localhost 9092 session-event
在我 运行 脚本之后,脚本执行正常,但它似乎没有从 kafka 生产者作为批处理流式传输并停止执行。
我没有观察到任何具体错误,但没有从脚本中产生任何输出
我使用套接字程序验证了从 docker 容器内的主机接收数据的连通性,它工作正常。
我不确定我是否遗漏了任何配置..
预计:
上述 运行 在 spark-cluster 上的应用程序应该打印来自 kafka producer
的流
实际
"id" : "f4e8829f-583e-4630-ac22-1d7da2eb80e7",
"runId" : "4b93d523-7b7c-43ad-9ef6-272dd8a16e0a",
"name" : null,
"timestamp" : "2020-09-09T09:21:17.931Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1922,
"getBatch" : 287,
"getOffset" : 361,
"queryPlanning" : 111,
"triggerExecution" : 2766,
"walCommit" : 65
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[session-event]]",
"startOffset" : null,
"endOffset" : {
"session-event" : {
"0" : 24
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a1b0b4b"
}
}
根据 Spark 文档中提供的 Quick Example,您需要开始查询并等待其终止。
在你的情况下,这意味着你需要更换
print("Recieved Stream in String", converted_string)
与
query = df.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
问题出在我的 pyspark_stream 脚本上,我没有提供批处理时间和打印语句来查看日志...
因为它不是聚合流,所以我不得不在这里使用 'append'
result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()
我已经创建并部署了一个由 4 个容器组成的 spark 集群 运行
火花大师
火花工人
火花提交
data-mount-container : 从本地目录访问脚本
我在所有这些容器中添加了所需的依赖 jar
并且还在主机中部署了 kafka,它通过生产者生成流。
我按照以下文档中的确切步骤启动了 kafka
https://kafka.apache.org/quickstart
我验证了 kafka 生产者和消费者在 9092 端口上交换消息,工作正常
下面是我想作为结构化流处理的简单 pyspark 脚本
from pyspark import SparkContext
from pyspark.sql import SparkSession
print("Kafka App launched")
spark = SparkSession.builder.master("spark://master:7077").appName("kafka_Structured").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "hostmachine:9092").option("subscribe", "session-event").option("maxOffsetsPerTrigger", 10).load()
converted_string=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("Recieved Stream in String", converted_string)
下面是我用来执行脚本的 spark-submit
##container
# pyspark_vol - container for vol mounting
# spark/stru_kafka - container for spark-submit
# i linked the spark master and worker already using the container 'master'
##spark submit
docker run --add-host="localhost: myhost" --rm -it --link master:master --volumes-from pyspark_vol spark/stru_kafka spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 –jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-sql-kafka-0-10_2.11-2.1.1.jar --master spark://master:7077 /data/spark_session_kafka.py localhost 9092 session-event
在我 运行 脚本之后,脚本执行正常,但它似乎没有从 kafka 生产者作为批处理流式传输并停止执行。
我没有观察到任何具体错误,但没有从脚本中产生任何输出
我使用套接字程序验证了从 docker 容器内的主机接收数据的连通性,它工作正常。
我不确定我是否遗漏了任何配置..
预计:
上述 运行 在 spark-cluster 上的应用程序应该打印来自 kafka producer
的流实际
"id" : "f4e8829f-583e-4630-ac22-1d7da2eb80e7",
"runId" : "4b93d523-7b7c-43ad-9ef6-272dd8a16e0a",
"name" : null,
"timestamp" : "2020-09-09T09:21:17.931Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1922,
"getBatch" : 287,
"getOffset" : 361,
"queryPlanning" : 111,
"triggerExecution" : 2766,
"walCommit" : 65
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[session-event]]",
"startOffset" : null,
"endOffset" : {
"session-event" : {
"0" : 24
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6a1b0b4b"
}
}
根据 Spark 文档中提供的 Quick Example,您需要开始查询并等待其终止。
在你的情况下,这意味着你需要更换
print("Recieved Stream in String", converted_string)
与
query = df.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()
问题出在我的 pyspark_stream 脚本上,我没有提供批处理时间和打印语句来查看日志...
因为它不是聚合流,所以我不得不在这里使用 'append'
result =df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print("Kafka Straming output is",result)
query = result.writeStream.outputMode("append").format("console").trigger(processingTime='30 seconds').start()