在 ECLIPSE 中执行时无法查看 kafka 消费者输出:PySpark
Not able to view kafka consumer output while executing in ECLIPSE: PySpark
我在windows系统中安装了kafka和zookeeper。我已经启动了 kafka 和 zookeeper 服务器,创建了主题“javainuse-topic”,使用以下命令启动了生产者和消费者
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic javainuse-topic
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092
--topic javainuse-topic
.\bin\windows\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic javainuse-topic --from-beginning
我能够成功地将数据从生产者传输到消费者。因此,我在 eclipse 中编写了以下代码并尝试在本地执行它。但我无法在我的 eclipse 控制台中查看消费者数据。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import sys
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 1
topic = "javainuse-topic"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'javainuse-topic',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
# Group ID is completely arbitrary
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
time.sleep(6) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
您可以再试一次,但这次将 auto.offset.reset
设置为 'earliest'
(如果您使用的是旧消费者,则设置为 'smallest'
)。
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'javainuse-topic',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'earliest'})
# Group ID is completely arbitrary
我在windows系统中安装了kafka和zookeeper。我已经启动了 kafka 和 zookeeper 服务器,创建了主题“javainuse-topic”,使用以下命令启动了生产者和消费者
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javainuse-topic
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic javainuse-topic
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic javainuse-topic --from-beginning
我能够成功地将数据从生产者传输到消费者。因此,我在 eclipse 中编写了以下代码并尝试在本地执行它。但我无法在我的 eclipse 控制台中查看消费者数据。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import sys
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 1
topic = "javainuse-topic"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'javainuse-topic',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
# Group ID is completely arbitrary
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
time.sleep(6) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
您可以再试一次,但这次将 auto.offset.reset
设置为 'earliest'
(如果您使用的是旧消费者,则设置为 'smallest'
)。
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'localhost:9092',
'group.id':'javainuse-topic',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'earliest'})
# Group ID is completely arbitrary