将输入值拆分为 Pyspark 数据框中的不同字段
Spliting an input value to different fields in Pyspark dataframe
我正在使用 Pyspark 数据框来处理日志文件。下面是我使用 spark.read.text
从文件中读取的示例日志
[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test
[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test
我想将此日志拆分为 5 个字段,即时间戳、信息、URL、消息、路径。
怎么拆分成这样?
请帮我解决这个问题。非常感谢!
所以你有几个选择。
分隔符
所以您的日志文件有一些明显的分隔符,您可以根据这些分隔符进行拆分。为此,请使用 split
函数
from pyspark.sql import functions as F
df = df.withColumn(
"split_column",
F.split(F.col("log_column"), delimiter)
)
# "split_column" is now an array, so we need to pull items out the array
df = df.withColumn(
"timestamp",
F.col("split_column").getItem(0)
)
由于您有几个不同的字段,因此您需要将多个拆分与 trim
相结合。
有点像,
# Split on square brackets
df = df.withColumn(
"split_bracket_column",
F.split(F.col("log_column"), "]")
)
df = df.withColumn(
"timestamp",
F.col("split_bracket_column").getItem(0).trim()
)
df = df.withColumn(
"info",
F.col("split_bracket_column").getItem(1).trim()
)
df = df.withColumn(
"url",
F.col("split_bracket_column").getItem(2).trim()
)
df = df.withColumn(
"remaining",
F.col("split_bracket_column").getItem(3).trim()
)
# Split the remaining on the colon ":"
df = df.withColumn(
"split_colon_column",
F.split(F.col("remaining"), ":")
)
df = df.withColumn(
"messsage",
F.col("split_colon_column").getItem(0).trim()
)
df = df.withColumn(
"path",
F.col("split_colon_column").getItem(1).trim()
)
# Clean up temp columns
df.drop("split_bracket_column")
df.drop("remaining")
df.drop("split_colon_column")
正则表达式
由于格式非常固定,您可以使用正则表达式做同样的事情。
import re
regex_pattern = r"\[()\]\[()\]\[()\]()\: ()"
match_groups = ["timestamp", "info", "URL", "message", "path"]
for i in range(re.compile(regex_pattern).groups):
df = df.withColumn(
match_groups[i],
F.regexp_extract(F.col(log_column), regex_pattern, i + 1),
)
其中,log_column
是您的 Spark 数据框中包含日志消息的列。
注意:检查 regex_pattern
因为我面前没有编辑器。
val sUDF = udf((s: String) => {
val s2 = s.split("]").map(_.replace("[", "").trim).reverse
s2.tail.reverse ++ s2.head.split(": ")
})
val sDF = Seq("[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test")
.toDF("log")
.withColumn("arr", sUDF('log))
.select(
'arr(0).alias("Timestamp"),
'arr(1).alias("Info"),
'arr(2).alias("URL"),
'arr(3).alias("Message"),
'arr(4).alias("Path")
)
sDF.show(false)
// +------------------------+-----+----------------+-------------------------------------+------------------------+
// |Timestamp |Info |URL |Message |Path |
// +------------------------+-----+----------------+-------------------------------------+------------------------+
// |Wed Oct 11 14:32:52 2000|error|client 127.0.0.1|client denied by server configuration|/export/home/htdocs/test|
// +------------------------+-----+----------------+-------------------------------------+------------------------+
我正在使用 Pyspark 数据框来处理日志文件。下面是我使用 spark.read.text
从文件中读取的示例日志[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test
[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test
我想将此日志拆分为 5 个字段,即时间戳、信息、URL、消息、路径。
怎么拆分成这样? 请帮我解决这个问题。非常感谢!
所以你有几个选择。
分隔符
所以您的日志文件有一些明显的分隔符,您可以根据这些分隔符进行拆分。为此,请使用 split
函数
from pyspark.sql import functions as F
df = df.withColumn(
"split_column",
F.split(F.col("log_column"), delimiter)
)
# "split_column" is now an array, so we need to pull items out the array
df = df.withColumn(
"timestamp",
F.col("split_column").getItem(0)
)
由于您有几个不同的字段,因此您需要将多个拆分与 trim
相结合。
有点像,
# Split on square brackets
df = df.withColumn(
"split_bracket_column",
F.split(F.col("log_column"), "]")
)
df = df.withColumn(
"timestamp",
F.col("split_bracket_column").getItem(0).trim()
)
df = df.withColumn(
"info",
F.col("split_bracket_column").getItem(1).trim()
)
df = df.withColumn(
"url",
F.col("split_bracket_column").getItem(2).trim()
)
df = df.withColumn(
"remaining",
F.col("split_bracket_column").getItem(3).trim()
)
# Split the remaining on the colon ":"
df = df.withColumn(
"split_colon_column",
F.split(F.col("remaining"), ":")
)
df = df.withColumn(
"messsage",
F.col("split_colon_column").getItem(0).trim()
)
df = df.withColumn(
"path",
F.col("split_colon_column").getItem(1).trim()
)
# Clean up temp columns
df.drop("split_bracket_column")
df.drop("remaining")
df.drop("split_colon_column")
正则表达式
由于格式非常固定,您可以使用正则表达式做同样的事情。
import re
regex_pattern = r"\[()\]\[()\]\[()\]()\: ()"
match_groups = ["timestamp", "info", "URL", "message", "path"]
for i in range(re.compile(regex_pattern).groups):
df = df.withColumn(
match_groups[i],
F.regexp_extract(F.col(log_column), regex_pattern, i + 1),
)
其中,log_column
是您的 Spark 数据框中包含日志消息的列。
注意:检查 regex_pattern
因为我面前没有编辑器。
val sUDF = udf((s: String) => {
val s2 = s.split("]").map(_.replace("[", "").trim).reverse
s2.tail.reverse ++ s2.head.split(": ")
})
val sDF = Seq("[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test")
.toDF("log")
.withColumn("arr", sUDF('log))
.select(
'arr(0).alias("Timestamp"),
'arr(1).alias("Info"),
'arr(2).alias("URL"),
'arr(3).alias("Message"),
'arr(4).alias("Path")
)
sDF.show(false)
// +------------------------+-----+----------------+-------------------------------------+------------------------+
// |Timestamp |Info |URL |Message |Path |
// +------------------------+-----+----------------+-------------------------------------+------------------------+
// |Wed Oct 11 14:32:52 2000|error|client 127.0.0.1|client denied by server configuration|/export/home/htdocs/test|
// +------------------------+-----+----------------+-------------------------------------+------------------------+