Pyspark 格式日期

Pyspark Format Dates

我通过 pandas 解析导入了一个 CSV 文件,然后我将 csv 文件转换为 parquet 格式。

数据读取时间为bigint,日期为String

我可以使用 to_date 函数更改日期,即 df.withColumn('ObservationDate', to_date('ObservationDate'))

但是,我很难将时间列更改为 HH 格式,而且我也很难将小时数添加到日期列。我查看了 spark 文档,但找不到我要找的内容。

要将 Time 列转换为 HH 格式,请使用 format_string() 函数,然后使用 to_timestamp()[=21 添加 Time to date 列=]函数。

Example:

df.show()
#+---------------+-------------------+
#|ObservationTime|    ObservationDate|
#+---------------+-------------------+
#|              1|2016-02-01T00:00:00|
#|             12|2016-02-01T00:00:00|
#+---------------+-------------------+

from pyspark.sql.functions import *

df.withColumn("HH",format_string("%02d",col("ObservationTime"))).\
withColumn("new_date",to_timestamp(concat(to_date(col("ObservationDate")),lit(' '),format_string("%02d",col("ObservationTime"))),"yyyy-MM-dd HH")).\
show()
#+---------------+-------------------+---+-------------------+
#|ObservationTime|    ObservationDate| HH|           new_date|
#+---------------+-------------------+---+-------------------+
#|              1|2016-02-01T00:00:00| 01|2016-02-01 01:00:00|
#|             12|2016-02-01T00:00:00| 12|2016-02-01 12:00:00|
#+---------------+-------------------+---+-------------------+

另一种选择-

lpad 以小时为单位转换为 **HH** 格式 & **unix_timestamp 通过将 UTC 设置为火花会话时区

 df.show(false)
    df.printSchema()

    /**
      * +---------------+-------------------+
      * |ObservationTime|ObservationDate    |
      * +---------------+-------------------+
      * |1              |2016-02-01T00:00:00|
      * |12             |2016-02-01T00:00:00|
      * +---------------+-------------------+
      *
      * root
      * |-- ObservationTime: integer (nullable = false)
      * |-- ObservationDate: string (nullable = true)
      */

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    df.withColumn("ObservationTime", lpad($"ObservationTime", 2, "0"))
      .withColumn("new_ObservationDate",
        (unix_timestamp($"ObservationDate".cast("timestamp")).cast("long") + unix_timestamp($"ObservationTime", "HH").cast("long"))
          .cast("timestamp")
      )
      .show(false)

    /**
      * +---------------+-------------------+-------------------+
      * |ObservationTime|ObservationDate    |new_ObservationDate|
      * +---------------+-------------------+-------------------+
      * |01             |2016-02-01T00:00:00|2016-02-01 01:00:00|
      * |12             |2016-02-01T00:00:00|2016-02-01 12:00:00|
      * +---------------+-------------------+-------------------+
      */

这是我的尝试。

import pyspark.sql.functions as f

df.show(10, False)

df.withColumn('ObservationTime', f.lpad('ObservationTime', 2, '0')) \
  .withColumn('ObservationDate', f.to_timestamp('ObservationDate')) \
  .withColumn('ObservationTimestamp', f.from_unixtime(f.unix_timestamp('ObservationDate') + f.unix_timestamp('ObservationTime', 'H'))) \
  .show(10, False)

+---------------+-------------------+
|ObservationTime|ObservationDate    |
+---------------+-------------------+
|1              |2016-02-01T00:00:00|
|14             |2016-02-01T00:00:00|
+---------------+-------------------+

+---------------+-------------------+--------------------+
|ObservationTime|ObservationDate    |ObservationTimestamp|
+---------------+-------------------+--------------------+
|01             |2016-02-01 00:00:00|2016-02-01 01:00:00 |
|14             |2016-02-01 00:00:00|2016-02-01 14:00:00 |
+---------------+-------------------+--------------------+

我已经将 unix_timestamp 函数用于带有 'H' 的 ObservationTime,但在 Spark 2.x.

中它将是 'HH'