将 Apache Spark 结果发布到另一个 Application/Kafka
Publish Apache Spark result to another Application/Kafka
我目前正在设计一个快速数据聚合模块,它接收事件并将它们发布到 Kafka 集群。然后我们集成了 Kafka 和 Spark Streaming。 Spark Streaming 从 Kafka 读取流并执行一些计算。计算完成后,我们需要将结果发送到另一个应用程序。此应用程序可以是 Web 服务或 Kafka 集群。
我想知道我们该怎么做?据我所知,Spark Stream 将数据推送到下游,如数据库和文件系统。
您将如何设计这样的应用程序?我应该用 Storm 替换 Spark Stream 以便能够将结果发布到另一个应用程序吗?
请参考dstream.foreachRDD
,这是一个强大的原语,允许将数据发送到外部系统。
Design Patterns for using foreachRDD
下面是我的kafka集成代码,供大家参考(没有优化,只是为了POC,KafkaProducer对象可以在foreachRDD中复用):
DStream.foreachRDD(rdd => {
rdd.foreachPartition { partitionOfRecords =>
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", props("bootstrap.servers"))
kafkaProps.put("client.id", "KafkaIntegration Producer");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
val producer = new KafkaProducer[String, String](kafkaProps);
partitionOfRecords.foreach(record => {
val message = new ProducerRecord[String, String]("hdfs_log_test", record.asInstanceOf[String])
producer.send(message)
})
producer.close()
}
})
I am wondering how we can do this? From what I've read, Spark Stream pushes the data to downstream like Databases and file systems.
Spark 不限于 HDFS 或数据库,您可以自由地初始化与任何可用外部资源的连接。它可以返回到 Kafka、RabbitMQ 或 WebService。
如果您正在做简单的转换,如 map
、filter
、reduceByKey
等,那么使用 DStream.foreachRDD
就可以了。如果您将进行像 DStream.mapWithState
这样的有状态计算,那么一旦您处理完状态,您就可以简单地将数据发送到任何外部服务。
例如,我们使用 Kafka 作为输入数据流,并使用 RabbitMQ 和进行一些有状态计算后的输出。
我目前正在设计一个快速数据聚合模块,它接收事件并将它们发布到 Kafka 集群。然后我们集成了 Kafka 和 Spark Streaming。 Spark Streaming 从 Kafka 读取流并执行一些计算。计算完成后,我们需要将结果发送到另一个应用程序。此应用程序可以是 Web 服务或 Kafka 集群。
我想知道我们该怎么做?据我所知,Spark Stream 将数据推送到下游,如数据库和文件系统。
您将如何设计这样的应用程序?我应该用 Storm 替换 Spark Stream 以便能够将结果发布到另一个应用程序吗?
请参考dstream.foreachRDD
,这是一个强大的原语,允许将数据发送到外部系统。
Design Patterns for using foreachRDD
下面是我的kafka集成代码,供大家参考(没有优化,只是为了POC,KafkaProducer对象可以在foreachRDD中复用):
DStream.foreachRDD(rdd => {
rdd.foreachPartition { partitionOfRecords =>
val kafkaProps = new Properties()
kafkaProps.put("bootstrap.servers", props("bootstrap.servers"))
kafkaProps.put("client.id", "KafkaIntegration Producer");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
val producer = new KafkaProducer[String, String](kafkaProps);
partitionOfRecords.foreach(record => {
val message = new ProducerRecord[String, String]("hdfs_log_test", record.asInstanceOf[String])
producer.send(message)
})
producer.close()
}
})
I am wondering how we can do this? From what I've read, Spark Stream pushes the data to downstream like Databases and file systems.
Spark 不限于 HDFS 或数据库,您可以自由地初始化与任何可用外部资源的连接。它可以返回到 Kafka、RabbitMQ 或 WebService。
如果您正在做简单的转换,如 map
、filter
、reduceByKey
等,那么使用 DStream.foreachRDD
就可以了。如果您将进行像 DStream.mapWithState
这样的有状态计算,那么一旦您处理完状态,您就可以简单地将数据发送到任何外部服务。
例如,我们使用 Kafka 作为输入数据流,并使用 RabbitMQ 和进行一些有状态计算后的输出。