Airflow error with pandas: AttributeError: 'Pendulum' object has no attribute 'nanosecond'

Airflow error with pandas: AttributeError: 'Pendulum' object has no attribute 'nanosecond'

我有一个 pandas.DataFrame dfdf.index ,它产生这样的东西:

DatetimeIndex(['2014-10-06 00:55:11.357899904',
               '2014-10-06 00:56:39.046799898',
               '2014-10-06 00:56:39.057499886',
               '2014-10-06 00:56:40.684299946',
               '2014-10-06 00:56:41.115299940',
               '2014-10-06 01:03:52.764300108',
               '2014-10-06 01:21:18.448499918',
               '2014-10-06 01:21:18.457200050',
               '2014-10-06 01:21:18.584199905',
               '2014-10-06 01:21:18.594700098',
               ...
               '2014-11-05 00:25:47.996000051',
               '2014-11-05 00:56:45.081799984',
               '2014-11-05 00:56:45.096899986',
               '2014-11-05 05:50:57.639699936',
               '2014-11-05 06:08:56.365000010',
               '2014-11-05 06:11:20.519099950',
               '2014-11-05 06:15:03.470400095',
               '2014-11-05 06:15:03.981600046',
               '2014-11-05 06:25:31.514300108',
               '2014-11-05 06:25:59.310400009'],
              dtype='datetime64[ns]', name='time', length=1000, freq=None)

我是 运行 airflow 上的 DAG,停在以下行 df.loc[start_date:end_date],说:

AttributeError: 'Pendulum' object has no attribute 'nanosecond'

如果没有 运行 Airflow 中的代码,我无法重现错误。相同的代码在没有 Airflow 的情况下运行得很好。

start_dateAirflow macro execution_dateend_datenext_execution_date.

我想问题是 df 的日期时间 dtypestart_dateend_date 的不兼容,但是我不知道如何解决它。

我尝试删除时区,更改 dtype 但没有任何效果。

经过一番查找,我找到了问题的根源和解决方案。

问题

问题是由 Airflow 传下来的两个宏引起的:

  • start_date,也就是execution_date

  • end_date,也就是next_execution_date

它们的类型是 pendulum.datetime,而不是 Airflow 文档所说的 datetime.datetime。这会导致与 pandas.DataFrame.

的冲突

pandaspendulum 目前不能很好地协同工作,问题在 Whosebug asnwer 中有详细描述。

解决方法

解决方案似乎将 start_dateend_datependulum.datetime 转换为 datetime.datetime

为此,我创建了这个简单的函数,它在转换为 datetime.datetime 之前将 from 转换为字符串。我确信它们是更好的方法,但这非常简单和安全,因此我使用它的原因。

这是函数本身:

def pendulum_to_datetime(pendulum_date):
    """
    Convert pendulum to datetime format.

    The conversion is done from pendulum -> string -> dateime.

    Args:
        pendulum_date (pendulum): The date you wish to convert.

    Returns:
        (datetime) The converted date.
    """
    fmt = '%Y-%m-%dT%H:%M:%S%z'
    string_date = pendulum_date.strftime(fmt)
    return datetime.strptime(string_date, fmt)