在 Spark 任务中将数据保存到 ElasticSearch

Saving data to ElasticSearch in Spark task

在通过 Kafka 和 Spark 处理 Avro 消息流时,我将处理后的数据作为文档保存在 ElasticSearch 索引中。 这是代码(简化):

    directKafkaStream.foreachRDD(rdd ->{

        rdd.foreach(avroRecord -> {
            byte[] encodedAvroData = avroRecord._2;
            MyType t = deserialize(encodedAvroData);

    // Creating the ElasticSearch Transport client
    Settings settings = Settings.builder()
            .put("client.transport.ping_timeout", 5, TimeUnit.SECONDS).build();
    TransportClient client = new PreBuiltTransportClient(settings)
            .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));

    IndexRequest indexRequest = new IndexRequest("index", "item", id)
            .source(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index", "item", id)
            .doc(jsonBuilder()
                    .startObject()
                    .field("name", name)
                    .field("timestamp", new Timestamp(System.currentTimeMillis()))
                    .endObject())
            .upsert(indexRequest);

    client.update(updateRequest).get();

    client.close();

一切正常;唯一的问题是性能:保存到 ES 需要一些时间,我想这是因为我 open/close 每个 RDD 都有一个 ES Transport 客户端。 Spark documentation 表明这种方法非常正确:据我了解,唯一可能的优化是使用 rdd.foreachPartition,但我只有一个分区,所以我不确定这是否有益。 还有其他解决方案可以实现更好的性能吗?

因为你每次处理一个RDD的记录时都会创建一个新的connect。 所以,我认为使用 foreachPartition 将获得更好的性能,而不管只有一个分区,因为它可以帮助你把你的 ES 连接实例带到外面,在循环中重用它。

我会将处理过的消息流式传输回一个单独的 Kafka 主题,然后使用 Kafka Connect 将它们登陆到 Elasticsearch。这将您的 Spark 特定处理与将数据导入 Elasticsearch 分离开来。

它的实际例子:https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/