在 Spark 中获取上周一
Get Last Monday in Spark
我正在使用 Spark 2.0 和 Python API。
我有一个数据框,其中有一列类型为 DateType()。我想在包含最近星期一的数据框中添加一列。
我可以这样做:
reg_schema = pyspark.sql.types.StructType([
pyspark.sql.types.StructField('AccountCreationDate', pyspark.sql.types.DateType(), True),
pyspark.sql.types.StructField('UserId', pyspark.sql.types.LongType(), True)
])
reg = spark.read.schema(reg_schema).option('header', True).csv(path_to_file)
reg = reg.withColumn('monday',
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Mon',
reg.AccountCreationDate).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Tue',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 1)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Wed',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 2)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Thu',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 3)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Fri',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 4)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sat',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 5)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sun',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 6))
)))))))
然而,这似乎是一些本应相当简单的代码。有更简洁的方法吗?
您可以使用 next_day
并减去一周来确定下一个日期。所需函数可以按如下方式导入:
from pyspark.sql.functions import next_day, date_sub
作为:
def previous_day(date, dayOfWeek):
return date_sub(next_day(date, "monday"), 7)
最后一个例子:
from pyspark.sql.functions import to_date
df = sc.parallelize([
("2016-10-26", )
]).toDF(["date"]).withColumn("date", to_date("date"))
df.withColumn("last_monday", previous_day("date", "monday"))
结果:
+----------+-----------+
| date|last_monday|
+----------+-----------+
|2016-10-26| 2016-10-24|
+----------+-----------+
我发现 pyspark 的功能 trunc
也有效。
import pyspark.sql.functions as f
df = spark.createDataFrame([
(datetime.date(2020, 10, 27), ),
(datetime.date(2020, 12, 21), ),
(datetime.date(2020, 10, 13), ),
(datetime.date(2020, 11, 11), ),
], ["date_col"])
df = df.withColumn("first_day_of_week", f.trunc("date_col", "week"))
import pyspark.sql.functions as f
df = df.withColumn('days_from_monday', f.dayofweek(f.col('transaction_timestamp'))-2)
df = df.withColumn('transaction_week_start_date', f.expr("date_sub(transaction_timestamp, days_from_monday)"))
我正在使用 Spark 2.0 和 Python API。
我有一个数据框,其中有一列类型为 DateType()。我想在包含最近星期一的数据框中添加一列。
我可以这样做:
reg_schema = pyspark.sql.types.StructType([
pyspark.sql.types.StructField('AccountCreationDate', pyspark.sql.types.DateType(), True),
pyspark.sql.types.StructField('UserId', pyspark.sql.types.LongType(), True)
])
reg = spark.read.schema(reg_schema).option('header', True).csv(path_to_file)
reg = reg.withColumn('monday',
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Mon',
reg.AccountCreationDate).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Tue',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 1)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Wed',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 2)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Thu',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 3)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Fri',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 4)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sat',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 5)).otherwise(
pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sun',
pyspark.sql.functions.date_sub(reg.AccountCreationDate, 6))
)))))))
然而,这似乎是一些本应相当简单的代码。有更简洁的方法吗?
您可以使用 next_day
并减去一周来确定下一个日期。所需函数可以按如下方式导入:
from pyspark.sql.functions import next_day, date_sub
作为:
def previous_day(date, dayOfWeek):
return date_sub(next_day(date, "monday"), 7)
最后一个例子:
from pyspark.sql.functions import to_date
df = sc.parallelize([
("2016-10-26", )
]).toDF(["date"]).withColumn("date", to_date("date"))
df.withColumn("last_monday", previous_day("date", "monday"))
结果:
+----------+-----------+
| date|last_monday|
+----------+-----------+
|2016-10-26| 2016-10-24|
+----------+-----------+
我发现 pyspark 的功能 trunc
也有效。
import pyspark.sql.functions as f
df = spark.createDataFrame([
(datetime.date(2020, 10, 27), ),
(datetime.date(2020, 12, 21), ),
(datetime.date(2020, 10, 13), ),
(datetime.date(2020, 11, 11), ),
], ["date_col"])
df = df.withColumn("first_day_of_week", f.trunc("date_col", "week"))
import pyspark.sql.functions as f
df = df.withColumn('days_from_monday', f.dayofweek(f.col('transaction_timestamp'))-2)
df = df.withColumn('transaction_week_start_date', f.expr("date_sub(transaction_timestamp, days_from_monday)"))