Dataframes Pyspark 中时间戳列的分区
Partition of Timestamp column in Dataframes Pyspark
我在 PSspark 中有一个 DataFrame
格式如下
Date Id Name Hours Dno Dname
12/11/2013 1 sam 8 102 It
12/10/2013 2 Ram 7 102 It
11/10/2013 3 Jack 8 103 Accounts
12/11/2013 4 Jim 9 101 Marketing
我想根据 dno
进行分区并使用 Parquet 格式在 Hive 中另存为 table。
df.write.saveAsTable(
'default.testing', mode='overwrite', partitionBy='Dno', format='parquet')
查询运行良好,并在 Hive 中使用 Parquet 输入创建了 table。
现在我想根据日期列的年份和月份进行分区。时间戳是Unix时间戳
我们如何在 PySpark 中实现这一目标。我已经在配置单元中完成但无法在 PySpark
中完成
Spark >= 3.1
而不是 cast
使用 timestamp_seconds
from pyspark.sql.functions import timestamp_seconds
year(timestamp_seconds(col("timestamp")))
Spark < 3.1
只需提取您要使用的字段并提供列列表作为作者 partitionBy
的参数。如果timestamp
是以秒表示的UNIX时间戳:
df = sc.parallelize([
(1484810378, 1, "sam", 8, 102, "It"),
(1484815300, 2, "ram", 7, 103, "Accounts")
]).toDF(["timestamp", "id", "name", "hours", "dno", "dname"])
添加列:
from pyspark.sql.functions import year, month, col
df_with_year_and_month = (df
.withColumn("year", year(col("timestamp").cast("timestamp")))
.withColumn("month", month(col("timestamp").cast("timestamp"))))
并写:
(df_with_year_and_month
.write
.partitionBy("year", "month")
.mode("overwrite")
.format("parquet")
.saveAsTable("default.testing"))
我在 PSspark 中有一个 DataFrame
格式如下
Date Id Name Hours Dno Dname
12/11/2013 1 sam 8 102 It
12/10/2013 2 Ram 7 102 It
11/10/2013 3 Jack 8 103 Accounts
12/11/2013 4 Jim 9 101 Marketing
我想根据 dno
进行分区并使用 Parquet 格式在 Hive 中另存为 table。
df.write.saveAsTable(
'default.testing', mode='overwrite', partitionBy='Dno', format='parquet')
查询运行良好,并在 Hive 中使用 Parquet 输入创建了 table。
现在我想根据日期列的年份和月份进行分区。时间戳是Unix时间戳
我们如何在 PySpark 中实现这一目标。我已经在配置单元中完成但无法在 PySpark
中完成Spark >= 3.1
而不是 cast
使用 timestamp_seconds
from pyspark.sql.functions import timestamp_seconds
year(timestamp_seconds(col("timestamp")))
Spark < 3.1
只需提取您要使用的字段并提供列列表作为作者 partitionBy
的参数。如果timestamp
是以秒表示的UNIX时间戳:
df = sc.parallelize([
(1484810378, 1, "sam", 8, 102, "It"),
(1484815300, 2, "ram", 7, 103, "Accounts")
]).toDF(["timestamp", "id", "name", "hours", "dno", "dname"])
添加列:
from pyspark.sql.functions import year, month, col
df_with_year_and_month = (df
.withColumn("year", year(col("timestamp").cast("timestamp")))
.withColumn("month", month(col("timestamp").cast("timestamp"))))
并写:
(df_with_year_and_month
.write
.partitionBy("year", "month")
.mode("overwrite")
.format("parquet")
.saveAsTable("default.testing"))