将流 JSON 转换为 DataFrame
Convert streaming JSON to DataFrame
问题:如何将 JSON 字符串转换为 DataFrame 并仅选择我想要的键?
我上周才开始使用 Spark,我还在学习中,所以请多多包涵。
我正在使用 Spark(2.4) 结构化流。 spark 应用程序从推特流中获取数据(通过套接字),发送的数据是完整的推文 JSON 字符串。下面是其中一个 DataFrame。每行都是完整的 JSON 推文。
+--------------------+
| value|
+--------------------+
|{"created_at":"Tu...|
|{"created_at":"Tu...|
|{"created_at":"Tu...|
+--------------------+
按照 Venkata 的建议,我这样做了,翻译成 python(完整代码如下)
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
这是return值
+------------------------------+-------------------+
|created_at |id_str |
+------------------------------+-------------------+
|Wed Feb 20 04:51:18 +0000 2019|1098082646511443968|
|Wed Feb 20 04:51:18 +0000 2019|1098082646285082630|
|Wed Feb 20 04:51:18 +0000 2019|1098082646444441600|
|Wed Feb 20 04:51:18 +0000 2019|1098082646557642752|
|Wed Feb 20 04:51:18 +0000 2019|1098082646494797824|
|Wed Feb 20 04:51:19 +0000 2019|1098082646817681408|
+------------------------------+-------------------+
可以看出,只有我想要的 2 个键包含在 DataFrame 中。
希望这对新手有所帮助。
完整代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
sc = spark.sparkContext
lines = spark.readStream.format('socket').option('host', '127.0.0.1').option('port', 9999).load()
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
query = df.writeStream.format('console').option('truncate', 'False').start()
# this part is only used to print out the query when running as an app. Not needed if using jupyter
import time
time.sleep(10)
lines.stop()
这是一个示例代码片段,您可以使用它从 json 转换为 DataFrame。
val schema = new StructType().add("id", StringType).add("pin",StringType)
val dataFrame= data.
selectExpr("CAST(value AS STRING)").as[String].
select(from_json($"value",schema).
alias("tmp")).
select("tmp.*")
问题:如何将 JSON 字符串转换为 DataFrame 并仅选择我想要的键?
我上周才开始使用 Spark,我还在学习中,所以请多多包涵。
我正在使用 Spark(2.4) 结构化流。 spark 应用程序从推特流中获取数据(通过套接字),发送的数据是完整的推文 JSON 字符串。下面是其中一个 DataFrame。每行都是完整的 JSON 推文。
+--------------------+
| value|
+--------------------+
|{"created_at":"Tu...|
|{"created_at":"Tu...|
|{"created_at":"Tu...|
+--------------------+
按照 Venkata 的建议,我这样做了,翻译成 python(完整代码如下)
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
这是return值
+------------------------------+-------------------+
|created_at |id_str |
+------------------------------+-------------------+
|Wed Feb 20 04:51:18 +0000 2019|1098082646511443968|
|Wed Feb 20 04:51:18 +0000 2019|1098082646285082630|
|Wed Feb 20 04:51:18 +0000 2019|1098082646444441600|
|Wed Feb 20 04:51:18 +0000 2019|1098082646557642752|
|Wed Feb 20 04:51:18 +0000 2019|1098082646494797824|
|Wed Feb 20 04:51:19 +0000 2019|1098082646817681408|
+------------------------------+-------------------+
可以看出,只有我想要的 2 个键包含在 DataFrame 中。
希望这对新手有所帮助。
完整代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
sc = spark.sparkContext
lines = spark.readStream.format('socket').option('host', '127.0.0.1').option('port', 9999).load()
schema = StructType().add('created_at', StringType(), False).add('id_str', StringType(), False)
df = lines.selectExpr('CAST(value AS STRING)').select(from_json('value', schema).alias('temp')).select('temp.*')
query = df.writeStream.format('console').option('truncate', 'False').start()
# this part is only used to print out the query when running as an app. Not needed if using jupyter
import time
time.sleep(10)
lines.stop()
这是一个示例代码片段,您可以使用它从 json 转换为 DataFrame。
val schema = new StructType().add("id", StringType).add("pin",StringType)
val dataFrame= data.
selectExpr("CAST(value AS STRING)").as[String].
select(from_json($"value",schema).
alias("tmp")).
select("tmp.*")