使用前导时间戳读取 json

Reading json with leading timestamp

我在 S3 中有一个数据集 (~100GB),它有一个时间戳,后跟一个 JSON 字符串,而不仅仅是一个纯 JSON 字符串。此数据是 gzip 压缩的。有没有办法在不重新格式化数据以删除时间戳的情况下将这些数据读入数据框?我根本不需要时间戳,可以忽略。以下是数据示例:

2019-06-28T00:00:00.000Z { "a": 123, "b": "456", "c": 789 }

我通常使用粘合库 read_from_options 来读取它的数据,但我没有看到任何忽略时间戳的选项,只读入 JSON 字符串。我不确定 spark 是否有任何功能可以做到这一点。

让我们假设,这就是数据的样子 -

country|city|json_content
america|chicago|2019-06-28T00:00:00.000Z { "a": 123, "b": "456", "c": 789 }
india|mumbai|2019-06-28T00:00:00.000Z { "a": 123, "b": "456", "c": 789 }

将其读入 spark 数据帧 -

    val df = spark
      .read
      .option("header", "true") // Use first line of all files as header
      .option("delimiter", "|")
      .csv("csv_file_path")

因为你有 gzip 数据,首先将它读入 RDD,如下所示,然后将它转换成 DF(如果你需要我的帮助,请告诉我 RDD-to-DF转换。) -

val rdd = sc.textFile("myFile.gz")

导入重要功能-

import org.apache.spark.sql.functions._

编写并注册一个 UDF,它仅从您的时间戳+json 列

中提取 json 内容
    val getJsonContent = udf{input: String => input.substring(input.indexOf("{"))}

应用此 UDF 并创建最终的 Dataframe-

    val finalDf = df.withColumn("json_content",getJsonContent(col("json_content")))

A​​namdeo 的回答很好,但我要强调的是,出于性能考虑,您应该尽可能避免使用 UDF。在这种情况下,您可以轻松地使用 regexp_extract 将时间戳与您感兴趣的 JSON 内容分开:

scala> val regex =  "([0-9\-TZ\.:]+) (\{.*)"
regex: String = ([0-9\-TZ\.:]+) (\{.*)

scala> val dff = df.withColumn("tstamp", regexp_extract('json_content, regex, 1)).withColumn("json", regexp_extract('json_content, regex, 2)).drop("json_content")
dff: org.apache.spark.sql.DataFrame = [country: string, city: string ... 2 more fields]

scala> dff.show(false)
+-------+-------+------------------------+----------------------------------+
|country|city   |tstamp                  |json                              |
+-------+-------+------------------------+----------------------------------+
|america|chicago|2019-06-28T00:00:00.000Z|{ "a": 123, "b": "456", "c": 789 }|
|india  |mumbai |2019-06-28T00:00:00.000Z|{ "a": 123, "b": "456", "c": 789 }|
+-------+-------+------------------------+----------------------------------+

从现在开始,如果需要,您可以使用 Spark 的 built-in 函数,如 from_jsonget_json_object 直接处理您的 JSON 数据。