如何将包含部分 json 字符串的文本日志转换为 pyspark 中的结构化?

How to convert text log which contains partially json string to the structured in pyspark?

我正在尝试从非结构化日志创建数据框,其中部分包含 json

2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}

这是我尝试过的

rdd = session.sparkContext.textFile("F:\mypath\rdd_test_log.txt")

dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
                                     ip= data.split(" ")[1],
                                     EventTime=data.split(":")[2])).toDF()


结果是


---------+------------------------+
|EventTime                     |ip       |time                    |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+

预计:

time                     |ip        |eventtime          |sourcename|Keys        |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local     |-9serverkey |status     

那么如何将这个 json 字符串解析为 rdd?或者应该采用什么方法?

感谢您的帮助..

谢谢

您可以在字符串上使用 find('{') 来获取索引,您可以从中获取 JSON 文本的子字符串,然后解析该 JSON.

dataFrame = (
    rdd.map(lambda l: (l.split(" "), l))
    .map(
        lambda data: Row(
            time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
        )
    )
    .toDF()
    .select(
        "time",
        "ip",
        f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"",').alias(
            "EventTime"
        ),
    )
)

dataFrame.show(1, False)

演出

+------------------------+---------+---------------------------------------------------------------------------------------------+
|time                    |ip       |EventTime                                                                                    |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+

然后您可以将 EventTime 解析为一个映射,该映射可以进一步扩展到许多列中:

parsed = dataFrame.select(
    "time",
    "ip",
    f.from_json(
        "EventTime",
        StructType(
            [
                StructField("EventTime", StringType()),
                StructField("sourcename", StringType()),
                StructField("Keys", StringType()),
                StructField("Type", StringType()),
            ]
        ),
    ).alias("eventdetails"),
)

现在从地图中创建单独的列

parsed = (
    parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
    .withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
    .withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
    .withColumn("Type", parsed["eventdetails"].getItem("Type"))
    .drop("eventdetails")
)

parsed.show()

给出:

+--------------------+---------+-------------------+----------+-----------+------+
|                time|       ip|          eventtime|sourcename|       Keys|  Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+

请注意 我认为您的 JSON 是有效的。 "Keys":-9serverkey 是无效的 key/value 对,所以我将您的数据编辑为 "Keys":"-9serverkey"

将您的 RDD 替换为数据框并使用 text 获取您的文件:

df = spark.read.text("F:\mypath\rdd_test_log.txt")

df.show()
+--------------------+
|               value|
+--------------------+
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
+--------------------+

然后拆分并保留 json 作为文本。

# Version Spark >= 3
from pyspark.sql import functions as F

df = df.withColumn("values", F.split(F.col("value"), " ", limit=3)).select(
    F.col("values").getItem(0).alias("time"),
    F.col("values").getItem(1).alias("IP"),
    F.col("values").getItem(2).alias("JSON"),
)

# OR
# Version spark <= 2.4

from pyspark.sql import functions as F, types as T

udf_split = F.udf(lambda x : x.split(" ", 2), T.ArrayType(T.StringType()))

df = df.withColumn("values", udf_split(F.col("value"))).select(
    F.col("values").getItem(0).alias("time"),
    F.col("values").getItem(1).alias("IP"),
    F.col("values").getItem(2).alias("JSON"),
)

df.show()
+--------------------+---------+--------------------+
|                time|       IP|                JSON|
+--------------------+---------+--------------------+
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
+--------------------+---------+--------------------+

修复 json

的步骤
df = df.withColumn(
    "JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([^,]+)', '"Keys":""')
)
df.show(truncate=False)
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time                    |IP       |JSON                                                                                         |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+

然后您只需将 JSON 字符串转换为结构类型。

json_struct = T.StructType(
    [
        T.StructField("EventTime", T.StringType()),
        T.StructField("sourcename", T.StringType()),
        T.StructField("Keys", T.StringType()),
        T.StructField("Type", T.StringType()),
    ]
)

df = df.withColumn("JSON", F.from_json("JSON", json_struct))
df.select("time", "IP", "JSON.*").show()

+--------------------+---------+-------------------+----------+-----------+------+
|                time|       IP|          EventTime|sourcename|       Keys|  Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+