我怎样才能从 kafka 主题接收数据到我的 Streaming Structured DataFrame?
How could i get receive the data from kafka topic to my Streaming Structured DataFrame?
我知道如何使用我的 Kafka 主题中的数据,但我无法在正确的列中获取正确的数据。
我收到了 value 列中的所有数据,格式如下:
{"timestamp":"2021-11-09T11:03:48.955+01:00","time":"1","duration":"0","SourceComputer":"C1707","SourcePort":"N1","DestinationComputer":"C925","start/end":" "}
一些字段是空的“”,而其他字段中有一些数据(例如:“C1707”)。
我在想我可以使用这个功能:
DataFrame=DataFrame.withColumn(ColumnName[i],split(DataFrame["value"],',').getItem(i))
但我无法在确切的列中获取确切的数据。
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
| value| timestamp|time|duration|SourceComputer|SourcePort|DestinationComputer|start/end|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
|{"timestamp":"202...|{"timestamp":"202...|null| null| null| null| null| null|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
知道如何以正确的方式接收数据吗?
谢谢!
使用from_json 方法,您可以将Spark DataFrame 列上的JSON 字符串转换为结构类型。
然后你可以将你的结构类型转换为所需的数据帧
import org.apache.spark.sql.functions.from_json
val schema = new StructType()
.add("col1", StringType, true)
.add("col2", StringType, true)
.add("col3", StringType, true)
val df4=df.withColumn("value",from_json(col("value"),schema))
val df5=df4.select(col("value.*"))
请参考this link,一切都在这里解释。
我知道如何使用我的 Kafka 主题中的数据,但我无法在正确的列中获取正确的数据。
我收到了 value 列中的所有数据,格式如下:
{"timestamp":"2021-11-09T11:03:48.955+01:00","time":"1","duration":"0","SourceComputer":"C1707","SourcePort":"N1","DestinationComputer":"C925","start/end":" "}
一些字段是空的“”,而其他字段中有一些数据(例如:“C1707”)。 我在想我可以使用这个功能:
DataFrame=DataFrame.withColumn(ColumnName[i],split(DataFrame["value"],',').getItem(i))
但我无法在确切的列中获取确切的数据。
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
| value| timestamp|time|duration|SourceComputer|SourcePort|DestinationComputer|start/end|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
|{"timestamp":"202...|{"timestamp":"202...|null| null| null| null| null| null|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
知道如何以正确的方式接收数据吗?
谢谢!
使用from_json 方法,您可以将Spark DataFrame 列上的JSON 字符串转换为结构类型。 然后你可以将你的结构类型转换为所需的数据帧
import org.apache.spark.sql.functions.from_json
val schema = new StructType()
.add("col1", StringType, true)
.add("col2", StringType, true)
.add("col3", StringType, true)
val df4=df.withColumn("value",from_json(col("value"),schema))
val df5=df4.select(col("value.*"))
请参考this link,一切都在这里解释。