从spark中的kafka消息中获取主题
get topic from kafka message in spark
在我们的 spark-streaming 作业中,我们从 kafka 读取流中的消息。
为此,我们使用 KafkaUtils.createDirectStream
API 其中 returns JavaPairInputDStreamfrom
.
通过以下方式从 kafka(来自三个主题 - test1、test2、test3)读取消息:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
我们想以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每个消息的主题名称。
所以我们执行以下操作:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
这是SplitToLinesFunction
的实现:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
问题是 tuple2._1
为空,我们假设 tuple2._1
将包含一些元数据,例如消息来源的 topic/partition 的名称。
然而,当我们打印 tuple2._1
时,它是空的。
我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1
将包含它(而不是 null)?
请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称:
但它 returns 发送到 KafkaUtils.createDirectStream
的所有主题,而不是消息(属于当前 RDD)到达的特定主题。
所以它没有帮助我们识别主题的名称,从那里发送 RDD 中的消息。
编辑
作为对 David 回答的回应——我试过这样使用 MessageAndMetadata
:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
问题是 MessageAndMetadataFunction.call
方法中没有打印任何内容。我应该修复什么才能在 MessageAndMetadataFunction.call
方法中获取该 RDD 的相关主题?
使用 createDirectStream
的一个版本,该版本将 messageHandler
函数作为参数。这是我的做法:
val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
ssc,
kafkaParams,
getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)
那里有些东西对你来说没有任何意义 -- 相关部分是
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
如果您不熟悉 Scala
,该函数所做的只是 return 包含 msg.topic
和 msg.message
的 Tuple2
。您的函数需要 return 这两个,以便您在下游使用它们。您可以只 return 整个 MessageAndMetadata
对象,这会为您提供一些其他有趣的字段。但是如果你只想要 topic
和 message
,那么使用上面的
在 Kafka integration guide 的底部,有一个从消息中提取主题的示例。
Java中的相关代码:
// Hold a reference to the current offset ranges, so it can be used downstream
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}
).map(
...
).foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
...
return null;
}
}
);
这可能会被折叠成更紧凑的内容,只要求主题而不是其他内容。
在我们的 spark-streaming 作业中,我们从 kafka 读取流中的消息。
为此,我们使用 KafkaUtils.createDirectStream
API 其中 returns JavaPairInputDStreamfrom
.
通过以下方式从 kafka(来自三个主题 - test1、test2、test3)读取消息:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
我们想以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每个消息的主题名称。
所以我们执行以下操作:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
这是SplitToLinesFunction
的实现:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
问题是 tuple2._1
为空,我们假设 tuple2._1
将包含一些元数据,例如消息来源的 topic/partition 的名称。
然而,当我们打印 tuple2._1
时,它是空的。
我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1
将包含它(而不是 null)?
请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称:
但它 returns 发送到 KafkaUtils.createDirectStream
的所有主题,而不是消息(属于当前 RDD)到达的特定主题。
所以它没有帮助我们识别主题的名称,从那里发送 RDD 中的消息。
编辑
作为对 David 回答的回应——我试过这样使用 MessageAndMetadata
:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
问题是 MessageAndMetadataFunction.call
方法中没有打印任何内容。我应该修复什么才能在 MessageAndMetadataFunction.call
方法中获取该 RDD 的相关主题?
使用 createDirectStream
的一个版本,该版本将 messageHandler
函数作为参数。这是我的做法:
val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
ssc,
kafkaParams,
getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)
那里有些东西对你来说没有任何意义 -- 相关部分是
(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
如果您不熟悉 Scala
,该函数所做的只是 return 包含 msg.topic
和 msg.message
的 Tuple2
。您的函数需要 return 这两个,以便您在下游使用它们。您可以只 return 整个 MessageAndMetadata
对象,这会为您提供一些其他有趣的字段。但是如果你只想要 topic
和 message
,那么使用上面的
在 Kafka integration guide 的底部,有一个从消息中提取主题的示例。
Java中的相关代码:
// Hold a reference to the current offset ranges, so it can be used downstream
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}
).map(
...
).foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
...
return null;
}
}
);
这可能会被折叠成更紧凑的内容,只要求主题而不是其他内容。