将输入值拆分为 Pyspark 数据框中的不同字段

Spliting an input value to different fields in Pyspark dataframe

我正在使用 Pyspark 数据框来处理日志文件。下面是我使用 spark.read.text

从文件中读取的示例日志
[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test

[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test

我想将此日志拆分为 5 个字段,即时间戳、信息、URL、消息、路径。

怎么拆分成这样? 请帮我解决这个问题。非常感谢!

所以你有几个选择。

分隔符

所以您的日志文件有一些明显的分隔符,您可以根据这些分隔符进行拆分。为此,请使用 split 函数

from pyspark.sql import functions as F

df = df.withColumn(
    "split_column", 
    F.split(F.col("log_column"), delimiter)
)

# "split_column" is now an array, so we need to pull items out the array
df = df.withColumn(
    "timestamp",
    F.col("split_column").getItem(0)
)

由于您有几个不同的字段,因此您需要将多个拆分与 trim 相结合。

有点像,

# Split on square brackets
df = df.withColumn(
    "split_bracket_column", 
    F.split(F.col("log_column"), "]")
)

df = df.withColumn(
    "timestamp",
    F.col("split_bracket_column").getItem(0).trim()
)
df = df.withColumn(
    "info",
    F.col("split_bracket_column").getItem(1).trim()
)
df = df.withColumn(
    "url",
    F.col("split_bracket_column").getItem(2).trim()
)
df = df.withColumn(
    "remaining",
    F.col("split_bracket_column").getItem(3).trim()
)

# Split the remaining on the colon ":"
df = df.withColumn(
    "split_colon_column", 
    F.split(F.col("remaining"), ":")
)
df = df.withColumn(
    "messsage",
    F.col("split_colon_column").getItem(0).trim()
)
df = df.withColumn(
    "path",
    F.col("split_colon_column").getItem(1).trim()
)

# Clean up temp columns
df.drop("split_bracket_column")
df.drop("remaining")
df.drop("split_colon_column")

正则表达式

由于格式非常固定,您可以使用正则表达式做同样的事情。

import re

regex_pattern = r"\[()\]\[()\]\[()\]()\: ()"
match_groups = ["timestamp", "info", "URL", "message", "path"]

for i in range(re.compile(regex_pattern).groups):
    df = df.withColumn(
        match_groups[i], 
        F.regexp_extract(F.col(log_column), regex_pattern, i + 1),
    )

其中,log_column 是您的 Spark 数据框中包含日志消息的列。

注意:检查 regex_pattern 因为我面前没有编辑器。

 val sUDF = udf((s: String) => {
    val s2 = s.split("]").map(_.replace("[", "").trim).reverse
    s2.tail.reverse ++ s2.head.split(": ")
  })

  val sDF = Seq("[Wed Oct 11 14:32:52 2000] [error] [client 127.0.0.1] client denied by server configuration: /export/home/htdocs/test")
    .toDF("log")
    .withColumn("arr", sUDF('log))
    .select(
      'arr(0).alias("Timestamp"),
      'arr(1).alias("Info"),
      'arr(2).alias("URL"),
      'arr(3).alias("Message"),
      'arr(4).alias("Path")
    )

  sDF.show(false)
//  +------------------------+-----+----------------+-------------------------------------+------------------------+
//  |Timestamp               |Info |URL             |Message                              |Path                    |
//  +------------------------+-----+----------------+-------------------------------------+------------------------+
//  |Wed Oct 11 14:32:52 2000|error|client 127.0.0.1|client denied by server configuration|/export/home/htdocs/test|
//  +------------------------+-----+----------------+-------------------------------------+------------------------+