以非字符串化格式向 kafka 发送 json 个事件
Sending json events to kafka in non-stringified format
我创建了一个如下所示的数据框,其中我使用 to_json() 方法创建了 JSON 数组值。
+----------------------------------------------------------------------------------------------------
|json_data |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+
我正在使用以下方法将数据帧发送到 kafka 主题。
但是当我使用发送到 kafka 主题的数据时,我可以看到 json 数据被字符串化了。
推送数据到kafka的代码:
outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()
kafka 正在接收字符串化数据:
{
"name": "sensor1",
"value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}
我们如何将数据发送到 kafka 主题,这样我们就不会看到字符串化的 jsons 作为输出?
json_data
的类型是 string
并且您再次将 json_data
传递给
to_json(struct("*"))
函数。
勾选 value
列,该列将转至 kafka。
df.withColumn("value",to_json(struct($"*"))).show(false)
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|json_data |value |
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
试试下面的代码。
df
.withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
.selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()
我创建了一个如下所示的数据框,其中我使用 to_json() 方法创建了 JSON 数组值。
+----------------------------------------------------------------------------------------------------
|json_data |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+
我正在使用以下方法将数据帧发送到 kafka 主题。 但是当我使用发送到 kafka 主题的数据时,我可以看到 json 数据被字符串化了。
推送数据到kafka的代码:
outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()
kafka 正在接收字符串化数据:
{
"name": "sensor1",
"value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}
我们如何将数据发送到 kafka 主题,这样我们就不会看到字符串化的 jsons 作为输出?
json_data
的类型是 string
并且您再次将 json_data
传递给
to_json(struct("*"))
函数。
勾选 value
列,该列将转至 kafka。
df.withColumn("value",to_json(struct($"*"))).show(false)
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|json_data |value |
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{\"name\":\"sensor1\",\"value-array\":[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]}"}|
+-----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
试试下面的代码。
df
.withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
.selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()