为什么 python UDF returns 意外的日期时间对象,其中应用在 RDD 上的相同函数给出了正确的日期时间对象
Why python UDF returns unexpected datetime objects where as the same function applied over RDD gives proper datetime object
我不确定我是否做错了什么,如果这看起来很幼稚,请原谅我,
我的问题可通过以下数据重现
from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 5 2013 12:00AM')]).toDF()
我创建了一个函数来将此日期字符串解析为日期时间对象以进一步处理
from datetime import datetime
def date_convert(date_str):
date_format = '%b %d %Y %I:%M%p'
try:
dt=datetime.strptime(date_str,date_format)
except ValueError,v:
if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
dt = dt[:-(len(v.args[0])-26)]
dt=datetime.strptime(dt,date_format)
else:
raise v
return dt
现在,如果我从中创建一个 UDF 并将其应用于我的数据框,我会得到意想不到的数据
from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
结果如下
Out[40]:
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
但是如果我在将数据帧设为 RDD 之后使用它,那么它 returns 一个 pythond 日期时间对象
df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]:
[datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 5, 0, 0)]
我想用 dataframe 实现类似的功能。我该怎么做以及这种方法有什么问题(数据帧上的 UDF)
这是因为您必须设置 UDF
的 return 类型数据。显然你正在尝试获得timestamps
,如果是这种情况你必须这样写。
from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())
我不确定我是否做错了什么,如果这看起来很幼稚,请原谅我, 我的问题可通过以下数据重现
from pyspark.sql import Row
df = sc.parallelize([Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 1 2013 12:00AM'),
Row(C3=u'Dec 5 2013 12:00AM')]).toDF()
我创建了一个函数来将此日期字符串解析为日期时间对象以进一步处理
from datetime import datetime
def date_convert(date_str):
date_format = '%b %d %Y %I:%M%p'
try:
dt=datetime.strptime(date_str,date_format)
except ValueError,v:
if len(v.args) > 0 and v.args[0].startswith('unconverted data remains: '):
dt = dt[:-(len(v.args[0])-26)]
dt=datetime.strptime(dt,date_format)
else:
raise v
return dt
现在,如果我从中创建一个 UDF 并将其应用于我的数据框,我会得到意想不到的数据
from pyspark.sql.functions import udf
date_convert_udf = udf(date_convert)
df.select(date_convert_udf(df.C3).alias("datetime")).take(2)
结果如下
Out[40]:
[Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]'),
Row(datetime=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2013,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=1,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]')]
但是如果我在将数据帧设为 RDD 之后使用它,那么它 returns 一个 pythond 日期时间对象
df.rdd.map(lambda row:date_convert(row.C3)).collect()
(1) Spark Jobs
Out[42]:
[datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 1, 0, 0),
datetime.datetime(2013, 12, 5, 0, 0)]
我想用 dataframe 实现类似的功能。我该怎么做以及这种方法有什么问题(数据帧上的 UDF)
这是因为您必须设置 UDF
的 return 类型数据。显然你正在尝试获得timestamps
,如果是这种情况你必须这样写。
from pyspark.sql.types import TimestampType
date_convert_udf = udf(date_convert, TimestampType())