Spark Dataframe 以 avro 格式写入 kafka 主题?
Spark Dataframe write to kafka topic in avro format?
我在 Spark 中有一个 Dataframe,看起来像
eventDF
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
.......
10000|DPS123|OTH
我需要以Avro格式将其写入Kafka主题
目前我可以使用以下代码
在 Kafka 中编写 JSON
val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
.option("kafka.bootstrap.servers", "Host:port")
.option("topic", "eventdf")
.save()
现在我想以 Avro 格式将其写入 Kafka 主题
Spark >= 2.4:
您可以使用 to_avro
function from spark-avro
库。
import org.apache.spark.sql.avro._
eventDF.select(
to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)
Spark < 2.4
你必须以同样的方式做:
创建一个函数,将序列化的 Avro 记录写入 ByteArrayOutputStream
和 return 结果。天真的实现(这仅支持平面对象)可能类似于(采用自 Kafka Avro Scala Example by Sushil Kumar Singh)
import org.apache.spark.sql.Row
def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
val gr: GenericRecord = new GenericData.Record(schema)
row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(gr, encoder)
encoder.flush()
out.close()
out.toByteArray()
}
转换为udf
:
import org.apache.spark.sql.functions.udf
val schema: org.apache.avro.Schema
val encodeUDF = udf(encode(schema) _)
用它来替代 to_json
eventDF.select(
encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
)
我在 Spark 中有一个 Dataframe,看起来像
eventDF
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
.......
10000|DPS123|OTH
我需要以Avro格式将其写入Kafka主题 目前我可以使用以下代码
在 Kafka 中编写 JSONval kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
.option("kafka.bootstrap.servers", "Host:port")
.option("topic", "eventdf")
.save()
现在我想以 Avro 格式将其写入 Kafka 主题
Spark >= 2.4:
您可以使用 to_avro
function from spark-avro
库。
import org.apache.spark.sql.avro._
eventDF.select(
to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)
Spark < 2.4
你必须以同样的方式做:
创建一个函数,将序列化的 Avro 记录写入
ByteArrayOutputStream
和 return 结果。天真的实现(这仅支持平面对象)可能类似于(采用自 Kafka Avro Scala Example by Sushil Kumar Singh)import org.apache.spark.sql.Row def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = { val gr: GenericRecord = new GenericData.Record(schema) row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name))) val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(gr, encoder) encoder.flush() out.close() out.toByteArray() }
转换为
udf
:import org.apache.spark.sql.functions.udf val schema: org.apache.avro.Schema val encodeUDF = udf(encode(schema) _)
用它来替代
to_json
eventDF.select( encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value") )