如何将值从 Kafka 数据源转换为给定模式?
How to convert values from Kafka data source to a given schema?
我通过下面的代码从kafka服务器获取日志:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", mykey.Kafka_source)
.option("subscribe", mykey.Kafka_topic)
.load();
Dataset<String> dg = df
.selectExpr("CAST(value AS STRING)")
.as(STRING());
然而,dg的一个元素是这样"name : John Doe, age : 20",然而它只有一个键"value"。因此,当我将其保存在 HDFS 中时,它保存为 "value : "name : John Doe, age : 22""。但是,我想像这样更改架构:
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
这样元素就可以像"name : John Doe, age : 22"
一样保存
当前元素的架构是这样的:
root
|-- value: string (nullable = true)
我尝试编写代码将 dg 的每个元素转换为 Dataset 的新元素,但我认为 Java 中的结构化流不支持高级函数表达式。我怎样才能做到这一点..?我想要一些使用 StructType 的解决方案。
您只需将 value
转换为预期的架构。
如果值采用 JSON 格式,您将使用 from_json 标准函数之一:
from_json(e: Column, schema: Column): Column
对于其他格式,您必须应用转换(使用或不使用 UDF)来进行转换。
我通过下面的代码从kafka服务器获取日志:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", mykey.Kafka_source)
.option("subscribe", mykey.Kafka_topic)
.load();
Dataset<String> dg = df
.selectExpr("CAST(value AS STRING)")
.as(STRING());
然而,dg的一个元素是这样"name : John Doe, age : 20",然而它只有一个键"value"。因此,当我将其保存在 HDFS 中时,它保存为 "value : "name : John Doe, age : 22""。但是,我想像这样更改架构:
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
这样元素就可以像"name : John Doe, age : 22"
一样保存当前元素的架构是这样的:
root
|-- value: string (nullable = true)
我尝试编写代码将 dg 的每个元素转换为 Dataset 的新元素,但我认为 Java 中的结构化流不支持高级函数表达式。我怎样才能做到这一点..?我想要一些使用 StructType 的解决方案。
您只需将 value
转换为预期的架构。
如果值采用 JSON 格式,您将使用 from_json 标准函数之一:
from_json(e: Column, schema: Column): Column
对于其他格式,您必须应用转换(使用或不使用 UDF)来进行转换。