如何丰富流式查询的数据并将结果写入Elasticsearch?

How to enrich data of a streaming query and write the result to Elasticsearch?

对于给定的数据集 (originalData),我需要映射值,然后准备一个新的数据集,结合 elasticsearch 的搜索结果。

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");
List<SGEvent> listSGEvent = new ArrayList<>();

originalData.foreach((ForeachFunction<Row>) row -> {
 SGEvent event = new SGEvent();
 String sourceKey=row.get(4).toString();
 String searchQuery = "select id from es_correlation where es_correlation.key='"+sourceKey+"'";
 Dataset<Row> result = spark.sqlContext().sql(searchQuery);
 String id = null;
 if (result != null) {
    result.show();
    id = result.first().toString();
  }
 event.setId(id);
 event.setKey(sourceKey);
 listSGEvent.add(event)
}
Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);
Dataset<Row> finalData = spark.createDataset(listSGEvent, eventEncoderSG).toDF();

finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();

Spark 抛出以下异常:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)

我将 elasticsearch 值与 Dataset 相结合的方法有效吗?还有其他更好的解决方案吗?

这里有几个问题。

正如异常所述,orignalData 是一个流查询(流数据集),执行它的唯一方法是使用 writeStream.start()。这是一个问题。

您执行了 writeStream.start(),但使用了另一个查询 finalData,该查询不是流式处理而是批处理。那是另一个问题。

对于像您这样的“丰富”案例,您可以使用流连接(Dataset.join 运算符)或 DataStreamWriter.foreach and DataStreamWriter.foreachBatch 之一。我认为 DataStreamWriter.foreachBatch 会更有效率。

public DataStreamWriter<T> foreachBatch(VoidFunction2<Dataset<T>,Long> function)

(Java-specific) Sets the output of the streaming query to be processed using the provided function. This is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous). In every micro-batch, the provided function will be called in every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier. The batchId can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query).

您不仅可以一次获得流式微批处理的所有数据(Dataset<T> 类型的第一个输入参数),而且还可以提交另一个 Spark 作业(跨执行程序)关于数据。

伪代码如下所示(我使用的是 Scala,因为我更熟悉这种语言):

val dsWriter = originalData.foreachBatch { case (data, batchId) =>
  // make sure the data is small enough to collect on the driver
  // Otherwise expect OOME
  // It'd also be nice to have a Java bean to convert the rows to proper types and names
  val localData = data.collect

  // Please note that localData is no longer Spark's Dataset
  // It's a local Java collection

  // Use Java Collection API to work with the localData
  // e.g. using Scala
  // You're mapping over localData (for a single micro-batch)
  // And creating finalData
  // I'm using the same names as your code to be as close to your initial idea as possible
  val finalData = localData.map { row =>
    // row is the old row from your original code
    // do something with it
    // e.g. using Java
    String sourceKey=row.get(4).toString();
    ...
  }

  // Time to save the data processed to ES
  // finalData is a local Java/Scala collection not Spark's DataFrame!
  // Let's convert it to a DataFrame (and leverage the Spark distributed platform)

  // Note that I'm almost using your code, but it's a batch query not a streaming one
  // We're inside foreachBatch
  finalData
    .toDF // Convert a local collection to a Spark DataFrame
    .write  // this creates a batch query
    .format("org.elasticsearch.spark.sql")
    .option("es.mapping.id", "id")
    .option("es.write.operation", "upsert")
    .option("checkpointLocation","/tmp/checkpoint/sg_event")
    .save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query
}

dsWriter 是一个 DataStreamWriter,您现在可以启动它来启动流式查询。

我能够通过使用 SQL 联接实现实际解​​决方案。 请参考下面的代码。

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

orignalData.createOrReplaceTempView("stream_data");

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");

Dataset<Row> joinedData = spark.sqlContext().sql("select * from stream_data,es_correlation where es_correlation.key=stream_data.key");

// Or

/* By using Dataset Join Operator
 Dataset<Row> joinedData = orignalData.join(esFirst, "key");

*/

Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);

Dataset<SGEvent> finalData = joinedData.map((MapFunction<Row, SGEvent>) row -> {
 SGEvent event = new SGEvent();
 event.setId(row.get(0).toString());
 event.setKey(row.get(3).toString());
 return event;
},eventEncoderSG);


finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();