Apache Spark - 并行处理来自 Kafka 的消息 - Java

Apache Spark - Parallel Processing of messages from Kafka - Java

JavaPairReceiverInputDStream<String, byte[]> messages = KafkaUtils.createStream(...);
JavaPairDStream<String, byte[]> filteredMessages = filterValidMessages(messages);

JavaDStream<String> useCase1 = calculateUseCase1(filteredMessages);
JavaDStream<String> useCase2 = calculateUseCase2(filteredMessages);
JavaDStream<String> useCase3 = calculateUseCase3(filteredMessages);
JavaDStream<String> useCase4 = calculateUseCase4(filteredMessages);

...

我从 Kafka 检索消息,对其进行过滤并将相同的消息用于多个用例。这里useCase1到4是相互独立的,可以并行计算。但是,当我查看日志时,我发现计算是按顺序进行的。我怎样才能使它们并行 运行 。任何建议都会有所帮助。

尝试为您的 4 个用例中的每一个创建 Kafka 主题。然后尝试创建 4 个不同的 Kafka DStreams。

我将所有代码移到 for 循环中,并按 kafka 主题中的分区数进行迭代,我看到了改进。

for(i=0;i<numOfPartitions;i++)
{
JavaPairReceiverInputDStream<String, byte[]> messages =
KafkaUtils.createStream(...);
JavaPairDStream<String, byte[]> filteredMessages =
filterValidMessages(messages);

JavaDStream<String> useCase1 = calculateUseCase1(filteredMessages);
JavaDStream<String> useCase2 = calculateUseCase2(filteredMessages);
JavaDStream<String> useCase3 = calculateUseCase3(filteredMessages);
JavaDStream<String> useCase4 = calculateUseCase4(filteredMessages);
}

参考:http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/