如何将包含部分 json 字符串的文本日志转换为 pyspark 中的结构化?
How to convert text log which contains partially json string to the structured in pyspark?
我正在尝试从非结构化日志创建数据框,其中部分包含 json
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
这是我尝试过的
rdd = session.sparkContext.textFile("F:\mypath\rdd_test_log.txt")
dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
ip= data.split(" ")[1],
EventTime=data.split(":")[2])).toDF()
结果是
---------+------------------------+
|EventTime |ip |time |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+
预计:
time |ip |eventtime |sourcename|Keys |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local |-9serverkey |status
那么如何将这个 json 字符串解析为 rdd?或者应该采用什么方法?
感谢您的帮助..
谢谢
您可以在字符串上使用 find('{')
来获取索引,您可以从中获取 JSON 文本的子字符串,然后解析该 JSON.
dataFrame = (
rdd.map(lambda l: (l.split(" "), l))
.map(
lambda data: Row(
time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
)
)
.toDF()
.select(
"time",
"ip",
f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"",').alias(
"EventTime"
),
)
)
dataFrame.show(1, False)
演出
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |ip |EventTime |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
然后您可以将 EventTime
解析为一个映射,该映射可以进一步扩展到许多列中:
parsed = dataFrame.select(
"time",
"ip",
f.from_json(
"EventTime",
StructType(
[
StructField("EventTime", StringType()),
StructField("sourcename", StringType()),
StructField("Keys", StringType()),
StructField("Type", StringType()),
]
),
).alias("eventdetails"),
)
现在从地图中创建单独的列
parsed = (
parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
.withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
.withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
.withColumn("Type", parsed["eventdetails"].getItem("Type"))
.drop("eventdetails")
)
parsed.show()
给出:
+--------------------+---------+-------------------+----------+-----------+------+
| time| ip| eventtime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+
请注意 我认为您的 JSON 是有效的。 "Keys":-9serverkey
是无效的 key/value 对,所以我将您的数据编辑为 "Keys":"-9serverkey"
将您的 RDD 替换为数据框并使用 text
获取您的文件:
df = spark.read.text("F:\mypath\rdd_test_log.txt")
df.show()
+--------------------+
| value|
+--------------------+
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
+--------------------+
然后拆分并保留 json 作为文本。
# Version Spark >= 3
from pyspark.sql import functions as F
df = df.withColumn("values", F.split(F.col("value"), " ", limit=3)).select(
F.col("values").getItem(0).alias("time"),
F.col("values").getItem(1).alias("IP"),
F.col("values").getItem(2).alias("JSON"),
)
# OR
# Version spark <= 2.4
from pyspark.sql import functions as F, types as T
udf_split = F.udf(lambda x : x.split(" ", 2), T.ArrayType(T.StringType()))
df = df.withColumn("values", udf_split(F.col("value"))).select(
F.col("values").getItem(0).alias("time"),
F.col("values").getItem(1).alias("IP"),
F.col("values").getItem(2).alias("JSON"),
)
df.show()
+--------------------+---------+--------------------+
| time| IP| JSON|
+--------------------+---------+--------------------+
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
+--------------------+---------+--------------------+
修复 json
的步骤
df = df.withColumn(
"JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([^,]+)', '"Keys":""')
)
df.show(truncate=False)
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |IP |JSON |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
然后您只需将 JSON 字符串转换为结构类型。
json_struct = T.StructType(
[
T.StructField("EventTime", T.StringType()),
T.StructField("sourcename", T.StringType()),
T.StructField("Keys", T.StringType()),
T.StructField("Type", T.StringType()),
]
)
df = df.withColumn("JSON", F.from_json("JSON", json_struct))
df.select("time", "IP", "JSON.*").show()
+--------------------+---------+-------------------+----------+-----------+------+
| time| IP| EventTime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+
我正在尝试从非结构化日志创建数据框,其中部分包含 json
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
这是我尝试过的
rdd = session.sparkContext.textFile("F:\mypath\rdd_test_log.txt")
dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
ip= data.split(" ")[1],
EventTime=data.split(":")[2])).toDF()
结果是
---------+------------------------+
|EventTime |ip |time |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+
预计:
time |ip |eventtime |sourcename|Keys |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local |-9serverkey |status
那么如何将这个 json 字符串解析为 rdd?或者应该采用什么方法?
感谢您的帮助..
谢谢
您可以在字符串上使用 find('{')
来获取索引,您可以从中获取 JSON 文本的子字符串,然后解析该 JSON.
dataFrame = (
rdd.map(lambda l: (l.split(" "), l))
.map(
lambda data: Row(
time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
)
)
.toDF()
.select(
"time",
"ip",
f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"",').alias(
"EventTime"
),
)
)
dataFrame.show(1, False)
演出
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |ip |EventTime |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
然后您可以将 EventTime
解析为一个映射,该映射可以进一步扩展到许多列中:
parsed = dataFrame.select(
"time",
"ip",
f.from_json(
"EventTime",
StructType(
[
StructField("EventTime", StringType()),
StructField("sourcename", StringType()),
StructField("Keys", StringType()),
StructField("Type", StringType()),
]
),
).alias("eventdetails"),
)
现在从地图中创建单独的列
parsed = (
parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
.withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
.withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
.withColumn("Type", parsed["eventdetails"].getItem("Type"))
.drop("eventdetails")
)
parsed.show()
给出:
+--------------------+---------+-------------------+----------+-----------+------+
| time| ip| eventtime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+
请注意 我认为您的 JSON 是有效的。 "Keys":-9serverkey
是无效的 key/value 对,所以我将您的数据编辑为 "Keys":"-9serverkey"
将您的 RDD 替换为数据框并使用 text
获取您的文件:
df = spark.read.text("F:\mypath\rdd_test_log.txt")
df.show()
+--------------------+
| value|
+--------------------+
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
|2020-09-24T08:03:...|
+--------------------+
然后拆分并保留 json 作为文本。
# Version Spark >= 3
from pyspark.sql import functions as F
df = df.withColumn("values", F.split(F.col("value"), " ", limit=3)).select(
F.col("values").getItem(0).alias("time"),
F.col("values").getItem(1).alias("IP"),
F.col("values").getItem(2).alias("JSON"),
)
# OR
# Version spark <= 2.4
from pyspark.sql import functions as F, types as T
udf_split = F.udf(lambda x : x.split(" ", 2), T.ArrayType(T.StringType()))
df = df.withColumn("values", udf_split(F.col("value"))).select(
F.col("values").getItem(0).alias("time"),
F.col("values").getItem(1).alias("IP"),
F.col("values").getItem(2).alias("JSON"),
)
df.show()
+--------------------+---------+--------------------+
| time| IP| JSON|
+--------------------+---------+--------------------+
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
|2020-09-24T08:03:...|10.1.20.1|{"EventTime":"202...|
+--------------------+---------+--------------------+
修复 json
的步骤df = df.withColumn(
"JSON", F.regexp_replace(F.col("JSON"), r'"Keys":([^,]+)', '"Keys":""')
)
df.show(truncate=False)
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |IP |JSON |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
然后您只需将 JSON 字符串转换为结构类型。
json_struct = T.StructType(
[
T.StructField("EventTime", T.StringType()),
T.StructField("sourcename", T.StringType()),
T.StructField("Keys", T.StringType()),
T.StructField("Type", T.StringType()),
]
)
df = df.withColumn("JSON", F.from_json("JSON", json_struct))
df.select("time", "IP", "JSON.*").show()
+--------------------+---------+-------------------+----------+-----------+------+
| time| IP| EventTime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+