如何将选定的列写入 Kafka 主题?
How to write selected columns to Kafka topic?
我正在使用带有 java 1.8 的 spark-sql-2.4.1v。
和卡夫卡版本 spark-sql-kafka-0-10_2.11_2.4.3 和 kafka-clients_0.10.0.0
StreamingQuery queryComapanyRecords =
comapanyRecords
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.option("auto.create.topics.enable", "false")
.option("key.serializer","org.apache.kafka.common.serialization.StringDeserializer")
.option("value.serializer", "com.spgmi.ca.prescore.serde.MessageRecordSerDe")
.option("checkpointLocation", "/app/chkpnt/" )
.outputMode("append")
.start();
queryLinkingMessageRecords.awaitTermination();
给出错误:
Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun.apply(KafkaWriter.scala:71)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun.apply(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:121)
我尝试按以下方式修复,但无法发送值,即在我的情况下这是一个 java bean。
StreamingQuery queryComapanyRecords =
comapanyRecords.selectExpr("CAST(company_id AS STRING) AS key", "to_json(struct(\"company_id\",\"fiscal_year\",\"fiscal_quarter\")) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.start();
So is there anyway in java how to handle/send this value( i.e. Java
bean as record) ??.
Kafka data source 需要特定的架构来读取(加载)和写入(保存)数据集。
引用 official documentation(突出显示最重要的字段/列):
Each row in the source has the following schema:
...
value binary
...
换句话说,当您从 Kafka 主题中读取时,您在 value
列中有 Kafka 记录,并且您必须将数据保存到 value
列中可用的 Kafka 主题中,如下所示嗯。
换句话说,Kafka 中的任何内容都在 value
列中。 value
列是您“存储”业务记录(数据)的地方。
关于你的问题:
How to write selected columns to Kafka topic?
您应该将选定的列“打包”在一起,这样它们就可以一起成为 value
列的一部分。 to_json
标准函数非常适合,因此所选列将成为 JSON 消息。
例子
举个例子。
不要忘记使用 Kafka 数据源启动 Spark 应用程序或 spark-shell
。注意 Scala(2.11
或 2.12
)和 Spark(例如 2.4.4
)的版本。
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
让我们从创建示例数据集开始。任何多字段数据集都可以。
val ns = Seq((0, "zero")).toDF("id", "name")
scala> ns.show
+---+----+
| id|name|
+---+----+
| 0|zero|
+---+----+
如果我们尝试将数据集写入 Kafka 主题,它会由于缺少 value
列而出错。这就是你最初面对的。
scala> ns.write.format("kafka").option("topic", "in_topic").save
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:138)
...
您必须想出一种方法将多个字段(列)“打包”在一起,并使其作为 value
列可用。 struct
和 to_json
标准函数即可。
val vs = ns.withColumn("value", to_json(struct("id", "name")))
scala> vs.show(truncate = false)
+---+----+----------------------+
|id |name|value |
+---+----+----------------------+
|0 |zero|{"id":0,"name":"zero"}|
+---+----+----------------------+
保存到 Kafka 主题现在应该是一件轻而易举的事。
vs.write.format("kafka").option("topic", "in_topic").save
我正在使用带有 java 1.8 的 spark-sql-2.4.1v。 和卡夫卡版本 spark-sql-kafka-0-10_2.11_2.4.3 和 kafka-clients_0.10.0.0
StreamingQuery queryComapanyRecords =
comapanyRecords
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.option("auto.create.topics.enable", "false")
.option("key.serializer","org.apache.kafka.common.serialization.StringDeserializer")
.option("value.serializer", "com.spgmi.ca.prescore.serde.MessageRecordSerDe")
.option("checkpointLocation", "/app/chkpnt/" )
.outputMode("append")
.start();
queryLinkingMessageRecords.awaitTermination();
给出错误:
Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun.apply(KafkaWriter.scala:71)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun.apply(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:121)
我尝试按以下方式修复,但无法发送值,即在我的情况下这是一个 java bean。
StreamingQuery queryComapanyRecords =
comapanyRecords.selectExpr("CAST(company_id AS STRING) AS key", "to_json(struct(\"company_id\",\"fiscal_year\",\"fiscal_quarter\")) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers",KAFKA_BROKER)
.option("topic", "in_topic")
.start();
So is there anyway in java how to handle/send this value( i.e. Java bean as record) ??.
Kafka data source 需要特定的架构来读取(加载)和写入(保存)数据集。
引用 official documentation(突出显示最重要的字段/列):
Each row in the source has the following schema:
...
value binary
...
换句话说,当您从 Kafka 主题中读取时,您在 value
列中有 Kafka 记录,并且您必须将数据保存到 value
列中可用的 Kafka 主题中,如下所示嗯。
换句话说,Kafka 中的任何内容都在 value
列中。 value
列是您“存储”业务记录(数据)的地方。
关于你的问题:
How to write selected columns to Kafka topic?
您应该将选定的列“打包”在一起,这样它们就可以一起成为 value
列的一部分。 to_json
标准函数非常适合,因此所选列将成为 JSON 消息。
例子
举个例子。
不要忘记使用 Kafka 数据源启动 Spark 应用程序或 spark-shell
。注意 Scala(2.11
或 2.12
)和 Spark(例如 2.4.4
)的版本。
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
让我们从创建示例数据集开始。任何多字段数据集都可以。
val ns = Seq((0, "zero")).toDF("id", "name")
scala> ns.show
+---+----+
| id|name|
+---+----+
| 0|zero|
+---+----+
如果我们尝试将数据集写入 Kafka 主题,它会由于缺少 value
列而出错。这就是你最初面对的。
scala> ns.write.format("kafka").option("topic", "in_topic").save
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery(KafkaWriter.scala:71)
at scala.Option.getOrElse(Option.scala:138)
...
您必须想出一种方法将多个字段(列)“打包”在一起,并使其作为 value
列可用。 struct
和 to_json
标准函数即可。
val vs = ns.withColumn("value", to_json(struct("id", "name")))
scala> vs.show(truncate = false)
+---+----+----------------------+
|id |name|value |
+---+----+----------------------+
|0 |zero|{"id":0,"name":"zero"}|
+---+----+----------------------+
保存到 Kafka 主题现在应该是一件轻而易举的事。
vs.write.format("kafka").option("topic", "in_topic").save