使用 Spark 从 Kafka Avro 到 Elasticsearch
Kafka Avro to Elasticsearch with Spark
想要使用 Spark
作业(和具有许多已定义模式的 SchemaRegistry)将来自 Kafka
个主题的 Avro
消息放入 Elasticsearch
。我能够成功读取记录并将其反序列化为字符串 (json
) 格式(使用这两种方法):
// Deserialize Avro to String
def avroToJsonString(record: GenericRecord): String = try {
val baos = new ByteArrayOutputStream
try {
val schema = record.getSchema
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
avroWriter.write(record, jsonEncoder)
jsonEncoder.flush()
baos.flush()
new String(baos.toByteArray)
} catch {
case ex: IOException =>
throw new IllegalStateException(ex)
} finally if (baos != null) baos.close()
}
// Parse JSON String
val parseJsonStream = (inStream: String) => {
try {
val parsed = Json.parse(inStream)
Option(parsed)
} catch {
case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
e.printStackTrace()
None
}
}
我正在逐条读取记录,我在调试器中看到反序列化的 JSON
字符串,一切看起来都很好,但出于某种原因我无法将它们保存到 Elasticsearch
,因为我猜 RDD
需要调用 saveToEs
方法。这就是我从 Kafka
:
读取 avro 记录的方式
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))
val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
rdd.foreach( x => {
val jsonString: String = avroToJsonString(x.value())
parseJsonStream(jsonString)
})
})
如果我正在阅读 json(不是 Avro)记录,我可以这样做:
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
我在 saveToEs
方法中有一个错误说
Cannot resolve overloaded method 'saveToEs'
试图用 sc.makeRDD()
制作 rdd
但也没有成功。我应该如何将批处理作业中的所有这些记录放入 RDD
,然后放入 Elasticsearch
,否则我做错了吗?
更新
尝试了解决方案:
val messages: DStream[Unit] = kafkaStream
.map(record => record.value)
.flatMap(record => {
val record1 = avroToJsonString(record)
JSON.parseFull(record1).map(rawMap => {
val map = rawMap.asInstanceOf[Map[String,String]]
})
})
再次使用相同的 Error
(无法解析重载方法)
更新2
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
现在我在 ES 中获取了记录。
使用 Spark 2.3.0
和 Scala 2.11.8
我做到了:
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
想要使用 Spark
作业(和具有许多已定义模式的 SchemaRegistry)将来自 Kafka
个主题的 Avro
消息放入 Elasticsearch
。我能够成功读取记录并将其反序列化为字符串 (json
) 格式(使用这两种方法):
// Deserialize Avro to String
def avroToJsonString(record: GenericRecord): String = try {
val baos = new ByteArrayOutputStream
try {
val schema = record.getSchema
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
avroWriter.write(record, jsonEncoder)
jsonEncoder.flush()
baos.flush()
new String(baos.toByteArray)
} catch {
case ex: IOException =>
throw new IllegalStateException(ex)
} finally if (baos != null) baos.close()
}
// Parse JSON String
val parseJsonStream = (inStream: String) => {
try {
val parsed = Json.parse(inStream)
Option(parsed)
} catch {
case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
e.printStackTrace()
None
}
}
我正在逐条读取记录,我在调试器中看到反序列化的 JSON
字符串,一切看起来都很好,但出于某种原因我无法将它们保存到 Elasticsearch
,因为我猜 RDD
需要调用 saveToEs
方法。这就是我从 Kafka
:
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))
val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
rdd.foreach( x => {
val jsonString: String = avroToJsonString(x.value())
parseJsonStream(jsonString)
})
})
如果我正在阅读 json(不是 Avro)记录,我可以这样做:
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
我在 saveToEs
方法中有一个错误说
Cannot resolve overloaded method 'saveToEs'
试图用 sc.makeRDD()
制作 rdd
但也没有成功。我应该如何将批处理作业中的所有这些记录放入 RDD
,然后放入 Elasticsearch
,否则我做错了吗?
更新
尝试了解决方案:
val messages: DStream[Unit] = kafkaStream
.map(record => record.value)
.flatMap(record => {
val record1 = avroToJsonString(record)
JSON.parseFull(record1).map(rawMap => {
val map = rawMap.asInstanceOf[Map[String,String]]
})
})
再次使用相同的 Error
(无法解析重载方法)
更新2
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
现在我在 ES 中获取了记录。
使用 Spark 2.3.0
和 Scala 2.11.8
我做到了:
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}