在 Spark 任务中将数据保存到 ElasticSearch
Saving data to ElasticSearch in Spark task
在通过 Kafka 和 Spark 处理 Avro 消息流时,我将处理后的数据作为文档保存在 ElasticSearch 索引中。
这是代码(简化):
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
一切正常;唯一的问题是性能:保存到 ES 需要一些时间,我想这是因为我 open/close 每个 RDD 都有一个 ES Transport 客户端。 Spark documentation 表明这种方法非常正确:据我了解,唯一可能的优化是使用 rdd.foreachPartition,但我只有一个分区,所以我不确定这是否有益。
还有其他解决方案可以实现更好的性能吗?
因为你每次处理一个RDD的记录时都会创建一个新的connect。
所以,我认为使用 foreachPartition
将获得更好的性能,而不管只有一个分区,因为它可以帮助你把你的 ES 连接实例带到外面,在循环中重用它。
我会将处理过的消息流式传输回一个单独的 Kafka 主题,然后使用 Kafka Connect 将它们登陆到 Elasticsearch。这将您的 Spark 特定处理与将数据导入 Elasticsearch 分离开来。
在通过 Kafka 和 Spark 处理 Avro 消息流时,我将处理后的数据作为文档保存在 ElasticSearch 索引中。 这是代码(简化):
directKafkaStream.foreachRDD(rdd ->{
rdd.foreach(avroRecord -> {
byte[] encodedAvroData = avroRecord._2;
MyType t = deserialize(encodedAvroData);
// Creating the ElasticSearch Transport client
Settings settings = Settings.builder()
.put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
IndexRequest indexRequest = new IndexRequest("index", "item", id)
.source(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
.doc(jsonBuilder()
.startObject()
.field("name", name)
.field("timestamp", new Timestamp(System.currentTimeMillis()))
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
client.close();
一切正常;唯一的问题是性能:保存到 ES 需要一些时间,我想这是因为我 open/close 每个 RDD 都有一个 ES Transport 客户端。 Spark documentation 表明这种方法非常正确:据我了解,唯一可能的优化是使用 rdd.foreachPartition,但我只有一个分区,所以我不确定这是否有益。 还有其他解决方案可以实现更好的性能吗?
因为你每次处理一个RDD的记录时都会创建一个新的connect。
所以,我认为使用 foreachPartition
将获得更好的性能,而不管只有一个分区,因为它可以帮助你把你的 ES 连接实例带到外面,在循环中重用它。
我会将处理过的消息流式传输回一个单独的 Kafka 主题,然后使用 Kafka Connect 将它们登陆到 Elasticsearch。这将您的 Spark 特定处理与将数据导入 Elasticsearch 分离开来。