Spark 消费者不读取 Kafka 生产者消息 Scala
Spark consumer doesn't read Kafka producer messages Scala
我正在尝试创建连接到 Spark 消费者的 Kafka 生产者。生产者工作正常,但是,Spark 中的消费者出于某种原因没有从主题中读取数据。我 运行 kafka 在 docker-compose 中使用 spotify/kafka 图像。
这是我的消费者:
object SparkConsumer {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
val topic1 = "topic1"
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topic1), kafkaParams)
)
lines.print()
}
Kafka 生产者看起来像这样:
object KafkaProducer {
def main(args: Array[String]) {
val events = 10
val topic = "topic1"
val brokers = "localhost:9092"
val random = new Random()
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("client.id", "KafkaProducerExample")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val key = null
val values = "2017-11-07 04:06:03"
val data = new ProducerRecord[String, String](topic, key, values)
producer.send(data)
System.out.println("sent : " + data.value())
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
producer.close()
}
}
更新:
我的 docker-使用 Kafka 编写文件:
version: '3.3'
services:
kafka:
image: spotify/kafka
ports:
- "9092:9092"
这是将 Kafka 与 Docker 结合使用的常见问题。首先,您应该检查您的主题在 zookeeper 中的配置是什么。您可以在 Kafka 容器内使用 Zookeeper 脚本。可能在创建主题时,ADVERTISED_HOST 是您的服务名称。因此,当消费者尝试连接到代理时,此 returns "kafka" 作为代理位置。因为你是运行网络外的消费者,你的消费者永远不会连接到broker消费。尝试使用 ADVERTISED_HOST=localhost 为您的 kafka 容器设置环境。
我正在尝试创建连接到 Spark 消费者的 Kafka 生产者。生产者工作正常,但是,Spark 中的消费者出于某种原因没有从主题中读取数据。我 运行 kafka 在 docker-compose 中使用 spotify/kafka 图像。
这是我的消费者:
object SparkConsumer {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
val topic1 = "topic1"
def kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topic1), kafkaParams)
)
lines.print()
}
Kafka 生产者看起来像这样:
object KafkaProducer {
def main(args: Array[String]) {
val events = 10
val topic = "topic1"
val brokers = "localhost:9092"
val random = new Random()
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("client.id", "KafkaProducerExample")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val key = null
val values = "2017-11-07 04:06:03"
val data = new ProducerRecord[String, String](topic, key, values)
producer.send(data)
System.out.println("sent : " + data.value())
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
producer.close()
}
}
更新:
我的 docker-使用 Kafka 编写文件:
version: '3.3'
services:
kafka:
image: spotify/kafka
ports:
- "9092:9092"
这是将 Kafka 与 Docker 结合使用的常见问题。首先,您应该检查您的主题在 zookeeper 中的配置是什么。您可以在 Kafka 容器内使用 Zookeeper 脚本。可能在创建主题时,ADVERTISED_HOST 是您的服务名称。因此,当消费者尝试连接到代理时,此 returns "kafka" 作为代理位置。因为你是运行网络外的消费者,你的消费者永远不会连接到broker消费。尝试使用 ADVERTISED_HOST=localhost 为您的 kafka 容器设置环境。