如何使用spark streaming区分kafka中接收到的主题
How to differentiate topics received in kafka using spark streaming
我正在使用以下代码从 kafka 获取消息
scala 代码:
val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)
这是我的示例生产者代码。
from kafka import SimpleProducer, KafkaClient
import time
# To send messages synchronously
kafka = KafkaClient(serverip+':'+port)
producer = SimpleProducer(kafka)
kafka.ensure_topic_exists('test')
kafka.ensure_topic_exists('test1')
while(1):
print "sending message "
producer.send_messages(b'test', 'test,msg')
time.sleep(2)
producer.send_messages(b'test1', 'test1,msg')
time.sleep(2)
我的流媒体接收器打印
(null,'test,msg')
(null,'test1,msg')
问题:
1) How can I differentiate msg per topic level without actually
decoding the message ?
2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?
编辑:
使用 keyedProducer
kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)
kafka.ensure_topic_exists('test2')
while(1):
print "sending msg "
producer.send_messages(b'test2',b'key1','msg')
time.sleep(2)
这让我出错
raise PartitionUnavailableError("%s not available" % str(key))
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available
对于 #1 最简单的方法是为每个主题设置单独的流,如果在任何时候您需要将它们组合在一起并且它们具有相同的结构 - 您可以将它们合并
对于#2你试过用KeyedProducer吗?
上面link的片段:
producer = KeyedProducer(kafka)
producer.send_messages(b'my-topic', b'key1', b'some message')
producer.send_messages(b'my-topic', b'key2', b'this methode')
问题编号。 1 你可以使用这个签名
def
createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]
(ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
这将使您能够访问 MessageAndMetadata class,其中包含主题名称以及一些其他元数据,例如分区号和消息偏移量。例如
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Map[String, String]](
ssc,
Map("metadata.broker.list" -> "localhost:9092"),
topics,
(mm: MessageAndMetadata[String, String]) => Map(mm.topic -> mm.message))
然后你可以在地图键上进行模式匹配来做任何你想做的事情
我正在使用以下代码从 kafka 获取消息
scala 代码:
val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
zookeeperQuorum, consumerGroup, topicMap)
lines.print(10)
这是我的示例生产者代码。
from kafka import SimpleProducer, KafkaClient
import time
# To send messages synchronously
kafka = KafkaClient(serverip+':'+port)
producer = SimpleProducer(kafka)
kafka.ensure_topic_exists('test')
kafka.ensure_topic_exists('test1')
while(1):
print "sending message "
producer.send_messages(b'test', 'test,msg')
time.sleep(2)
producer.send_messages(b'test1', 'test1,msg')
time.sleep(2)
我的流媒体接收器打印
(null,'test,msg')
(null,'test1,msg')
问题:
1) How can I differentiate msg per topic level without actually
decoding the message ?
2) Why it is giving me null in the output ? From the documentation
it says key,value tuple. How can I create key,value tuple kind of
message ?
编辑: 使用 keyedProducer
kafka = KafkaClient(serverip+':'+port)
producer = KeyedProducer(kafka)
kafka.ensure_topic_exists('test2')
while(1):
print "sending msg "
producer.send_messages(b'test2',b'key1','msg')
time.sleep(2)
这让我出错
raise PartitionUnavailableError("%s not available" % str(key))
kafka.common.PartitionUnavailableError: TopicAndPartition(topic='test2', partition='key1') not available
对于 #1 最简单的方法是为每个主题设置单独的流,如果在任何时候您需要将它们组合在一起并且它们具有相同的结构 - 您可以将它们合并
对于#2你试过用KeyedProducer吗?
上面link的片段:
producer = KeyedProducer(kafka)
producer.send_messages(b'my-topic', b'key1', b'some message')
producer.send_messages(b'my-topic', b'key2', b'this methode')
问题编号。 1 你可以使用这个签名
def
createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]
(ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long],
messageHandler: (MessageAndMetadata[K, V]) ⇒ R): InputDStream[R]
这将使您能够访问 MessageAndMetadata class,其中包含主题名称以及一些其他元数据,例如分区号和消息偏移量。例如
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Map[String, String]](
ssc,
Map("metadata.broker.list" -> "localhost:9092"),
topics,
(mm: MessageAndMetadata[String, String]) => Map(mm.topic -> mm.message))
然后你可以在地图键上进行模式匹配来做任何你想做的事情