在 PySpark 作业上打印 Kafka 调试消息
Printing Kafka debug message on PySpark job
有没有办法在 运行 PySpark 时打印 Kafka 调试消息(我正在考虑类似于 librdkafka
调试消息或 kafkacat -D
选项的日志消息)工作?
问题是我在 PySpark 上使用以下代码连接到名为 A 的 Kafka 集群,每次有新消息传入时,它都能正常工作并将内容打印到控制台。但是当我切换到另一个集群时,称为 B 并以与集群 A 相同的方式设置,当有新消息进入时,它没有在屏幕上打印任何内容,我可以看到消息在两者上使用 kafkacat
工具都很好集群。
consumer.py
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"
try:
df = sqlc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", hosts) \
.option("kafka.security.protocol", securityProtocol) \
.option("kafka.sasl.mechanism", saslMechanism) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic) \
.load()
dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.outputMode('append') \
.format("console") \
.start()
dss.awaitTermination()
except KeyboardInterrupt:
print 'shutting down...'
kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="sssshhhh"
serviceName="kafka";
};
shell 命令:
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
--files "kafka.jaas" \
--driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
"./consumer.py"
似乎 kafka
群集 B 可以访问,因为我能够从中获取偏移量信息,但它只是没有读取消息。
问题是由工作节点连接到 Kafka 集群引起的,工作节点 IP 地址不在 Kafka 集群的防火墙白名单中。上面的代码导致工作节点超时并继续重试连接到 Kafka 集群,直到给出中断信号。
关于错误消息本身,没有向主节点生成错误消息,因为工作节点仍在尝试连接到 Kafka 集群,但偶尔会在主控制台上打印一条消息说它无法与工作节点通信(或某些消息,如 'gathering information')。
注意:这是我推测在工作节点中发生的情况(由于管理员权限,我无法登录),但可能有一个日志存储在工作节点上。 (如果有人能反驳或证明,将不胜感激)
至于 Kafka 调试消息本身,如果有错误、信息或警告发生,它看起来已经默认打印到屏幕上,具体取决于记录器级别设置,在一些奇怪的情况下,比如这个,日志消息屏幕上可能无法直接看到。
有没有办法在 运行 PySpark 时打印 Kafka 调试消息(我正在考虑类似于 librdkafka
调试消息或 kafkacat -D
选项的日志消息)工作?
问题是我在 PySpark 上使用以下代码连接到名为 A 的 Kafka 集群,每次有新消息传入时,它都能正常工作并将内容打印到控制台。但是当我切换到另一个集群时,称为 B 并以与集群 A 相同的方式设置,当有新消息进入时,它没有在屏幕上打印任何内容,我可以看到消息在两者上使用 kafkacat
工具都很好集群。
consumer.py
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"
try:
df = sqlc \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", hosts) \
.option("kafka.security.protocol", securityProtocol) \
.option("kafka.sasl.mechanism", saslMechanism) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic) \
.load()
dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream.outputMode('append') \
.format("console") \
.start()
dss.awaitTermination()
except KeyboardInterrupt:
print 'shutting down...'
kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="user1"
password="sssshhhh"
serviceName="kafka";
};
shell 命令:
spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
--files "kafka.jaas" \
--driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
"./consumer.py"
似乎 kafka
群集 B 可以访问,因为我能够从中获取偏移量信息,但它只是没有读取消息。
问题是由工作节点连接到 Kafka 集群引起的,工作节点 IP 地址不在 Kafka 集群的防火墙白名单中。上面的代码导致工作节点超时并继续重试连接到 Kafka 集群,直到给出中断信号。
关于错误消息本身,没有向主节点生成错误消息,因为工作节点仍在尝试连接到 Kafka 集群,但偶尔会在主控制台上打印一条消息说它无法与工作节点通信(或某些消息,如 'gathering information')。
注意:这是我推测在工作节点中发生的情况(由于管理员权限,我无法登录),但可能有一个日志存储在工作节点上。 (如果有人能反驳或证明,将不胜感激)
至于 Kafka 调试消息本身,如果有错误、信息或警告发生,它看起来已经默认打印到屏幕上,具体取决于记录器级别设置,在一些奇怪的情况下,比如这个,日志消息屏幕上可能无法直接看到。