以非字符串化格式向 kafka 发送 json 个事件

Sending json events to kafka in non-stringified format

我创建了一个如下所示的数据框,其中我使用 to_json() 方法创建了 JSON 数组值。

+---------------------------------------------------------------------------------------------------- 

|json_data                                                                                                  |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+

我正在使用以下方法将数据帧发送到 kafka 主题。 但是当我使用发送到 kafka 主题的数据时,我可以看到 json 数据被字符串化了。

推送数据到kafka的代码:

outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
        .write
        .format("kafka")
        .option("topic", "topic_test")
        .option("kafka.bootstrap.servers", "localhost:9093")
        .option("checkpointLocation", checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("truncate", false)
        .save()

kafka 正在接收字符串化数据:

{
    "name": "sensor1",
    "value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}

我们如何将数据发送到 kafka 主题,这样我们就不会看到字符串化的 jsons 作为输出?

json_data 的类型是 string 并且您再次将 json_data 传递给 to_json(struct("*")) 函数。

勾选 value 列,该列将转至 kafka。

df.withColumn("value",to_json(struct($"*"))).show(false)
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                  |value                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+

试试下面的代码。

 df
 .withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
 .selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
 .write
 .format("kafka")
 .option("topic", "topic_test")
 .option("kafka.bootstrap.servers", "localhost:9093")
 .option("checkpointLocation", checkpointPath)
 .option("kafka.sasl.mechanism", "PLAIN")
 .option("kafka.security.protocol", "SASL_SSL")
 .option("truncate", false)
 .save()