使用 Spark 从 Kafka Avro 到 Elasticsearch

Kafka Avro to Elasticsearch with Spark

想要使用 Spark 作业(和具有许多已定义模式的 SchemaRegistry)将来自 Kafka 个主题的 Avro 消息放入 Elasticsearch。我能够成功读取记录并将其反序列化为字符串 (json) 格式(使用这两种方法):

   // Deserialize Avro to String
  def avroToJsonString(record: GenericRecord): String = try {
    val baos = new ByteArrayOutputStream
    try {
      val schema = record.getSchema
      val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
      val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
      avroWriter.write(record, jsonEncoder)
      jsonEncoder.flush()
      baos.flush()
      new String(baos.toByteArray)
    } catch {
      case ex: IOException =>
        throw new IllegalStateException(ex)
    } finally if (baos != null) baos.close()
  }

  // Parse JSON String
  val parseJsonStream = (inStream: String) => {
      try {
        val parsed = Json.parse(inStream)
        Option(parsed)
      } catch {
        case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
          e.printStackTrace()
          None
      }
    }

我正在逐条读取记录,我在调试器中看到反序列化的 JSON 字符串,一切看起来都很好,但出于某种原因我无法将它们保存到 Elasticsearch,因为我猜 RDD 需要调用 saveToEs 方法。这就是我从 Kafka:

读取 avro 记录的方式
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))

      val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
        rdd.foreach( x => {
          val jsonString: String = avroToJsonString(x.value()) 
          parseJsonStream(jsonString) 
          })
        })

如果我正在阅读 json(不是 Avro)记录,我可以这样做:

EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)

我在 saveToEs 方法中有一个错误说

Cannot resolve overloaded method 'saveToEs'

试图用 sc.makeRDD() 制作 rdd 但也没有成功。我应该如何将批处理作业中的所有这些记录放入 RDD,然后放入 Elasticsearch,否则我做错了吗?

更新

尝试了解决方案:

val messages: DStream[Unit] = kafkaStream
        .map(record => record.value)
        .flatMap(record => {
          val record1 = avroToJsonString(record)
          JSON.parseFull(record1).map(rawMap => {
            val map = rawMap.asInstanceOf[Map[String,String]]
          })
        })

再次使用相同的 Error(无法解析重载方法)

更新2

val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
        val eventJSON = avroToJsonString(rdd.value())
        parseJsonStream(eventJSON)
      })

      try {
        EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
      } catch {
        case e: Exception =>
          EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
          e.printStackTrace()
      }

现在我在 ES 中获取了记录。

使用 Spark 2.3.0Scala 2.11.8

我做到了:

val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))

      val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
        val eventJSON = avroToJsonString(rdd.value())
        parseJsonStream(eventJSON)
      })

      try {
        EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
      } catch {
        case e: Exception =>
          EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
          e.printStackTrace()
      }