如何从元组(键,字典)的生成器构建一个带有集群时间戳作为索引的数据框?

How to build a dataframe with clustered timestamps as index from a generator of tuple(key, dict)?

我是 Pandas 的新手,所以也许我在这里遗漏了一些非常简单的东西,但搜索其他问题并没有找到我需要的东西。

我有一个 Python 生成器生成 (timestamp, {k1: v1, k2: v2, ...}) # the timestamp is a float 的元组 我想构建这种形式的数据框:

datetime(timestamp) (<-- this should be the index) | k1 | k2 | k3 |...

第二个请求(实际上可能有助于提高效率)是将时间戳非常接近(<0.3)的行合并为一行(承诺列不会重叠 - 即在每行至少有 Nan 列)。

下面几行是为我做的,但只是作为一个时间序列,而不是作为数据帧的索引,我不知道如何将它“粘回”到数据帧中:

    times.loc[times.diff() < 0.3] = times[times.diff() > 0.3]
    times = times.pad().map(datetime.fromtimestamp)

数据的大小可以达到超过一百万列的数千个(簇)时间戳。

这个选项对我来说是最快的:

t = {}
for ts, d in file_content:
    for k, v in d.items():
        t.setdefault(ts, {})[k] = v
df1 = pd.DataFrame.from_dict(t, orient='index')

加载到字典中花费了 14 秒,将字典加载到 df 中花费了 30 秒(其中输出数据帧的大小约为 1GB),但这没有对时间戳聚类进行任何优化。

加载数据帧的最佳方式是什么?可以构建时间戳索引并将其“附加”到该数据帧的代码是什么?

编辑: 这是 file_content:

中第一个元组的示例
In [2]: next(file_content)
Out[2]:
(1628463575.9415462,
 {'E2_S0_ME_rbw': 0,
  'E2_S0_ME_rio': 0,
  'E2_S0_ME_rlat': 0,
  'E2_S0_ME_rmdi': 0,
  'E2_S0_ME_wbw': 0,
  'E2_S0_ME_wio': 0,
  'E2_S0_ME_wlat': 0,
  'E2_S0_ME_wmdi': 0})

编辑2: 第二个元组(注意时间戳与前一个非常接近,并且键完全不同):

In [12]: next(file_content)
Out[12]:
(1628463575.946525,
 {'E2_S1_ME_errors': 0,
  'E2_S1_ME_messages': 0})

我已经对你的数据做了一个例子:

file_content = [
(1628463575.9415462,
 {'E2_S0_ME_rbw': 0,
  'E2_S0_ME_rio': 0,
  'E2_S0_ME_rlat': 0,
  'E2_S0_ME_rmdi': 0,
  'E2_S0_ME_wbw': 0,
  'E2_S0_ME_wio': 0,
  'E2_S0_ME_wlat': 0,
  'E2_S0_ME_wmdi': 0}
),
(1628463576.7,
 {'E2_S0_ME_rbw': 0,
  'E2_S0_ME_rio': 0,
  'E2_S0_ME_rlat': 0,
  'E2_S0_ME_rmdi': 1,
  'E2_S0_ME_wbw': 0,
  'E2_S0_ME_wio': 0,
  'E2_S0_ME_wlat': 0,
  'E2_S0_ME_wmdi': 0}
),
(1628464579,
 {'E2_S0_ME_rbw': 0,
  'E2_S0_ME_rio': 1,
  'E2_S0_ME_rlat': 0,
  'E2_S0_ME_rmdi': 0,
  'E2_S0_ME_wbw': 0,
  'E2_S0_ME_wio': 0,
  'E2_S0_ME_wlat': 0,
  'E2_S0_ME_wmdi': 0}
),
(1628493589,
 {'E2_S0_ME_rbw': 0,
  'E2_S0_ME_rio': 0,
  'E2_S0_ME_rlat': 0,
  'E2_S0_ME_rmdi': 0,
  'E2_S0_ME_wbw': 0,
  'E2_S0_ME_wio': 0,
  'E2_S0_ME_wlat': 0,
  'E2_S0_ME_wmdi': 0}
)
]

这是在索引中生成带有日期的数据框的代码:

for i in range(0,len(file_content)): 
    file_content[i][1]['time'] =  file_content[i][0] 
    file_content[i] = file_content[i][1]
d = pd.DataFrame(file_content)
d['time'] = d['time'].apply(lambda x: pd.datetime.fromtimestamp(x))
d = d.set_index('time')

输出:

您可以使用重新采样来拆分列。如果数据中的时间戳在时间上没有太大差异,这将很有帮助。但如果时间非常不同,您可能会得到很多 NaN 列。在我的示例中,它看起来像这样:

代码:

d = d.resample('3s').mean()

输出:

当然你可以在那之后直接删除 NaN,但如果你的数据不频繁,它可能会生成太大的数据帧。您也可以使用其他函数来聚合值,例如最小值或最大值。

您发现可以使用字典来加载数据,可以写得稍微简单一点:

>>> pd.DataFrame.from_dict(dict(file_contents), orient='index')
              E2_S0_ME_rbw  E2_S0_ME_rio  E2_S0_ME_rlat  E2_S0_ME_rmdi  E2_S0_ME_wbw  E2_S0_ME_wio  E2_S0_ME_wlat  E2_S0_ME_wmdi
1.628464e+09             0             0              0              0             0             0              0              0

您也可以直接将可迭代对象加载到数据框中,然后从那里进行规范化:

>>> fc = pd.DataFrame(file_contents)
>>> fc
              0                                                  1
0  1.628464e+09  {'E2_S0_ME_rbw': 0, 'E2_S0_ME_rio': 0, 'E2_S0_...'
>>> df = pd.json_normalize(fc[1]).join(fc[0].rename('timestamp'))
>>> df
   E2_S0_ME_rbw  E2_S0_ME_rio  E2_S0_ME_rlat  E2_S0_ME_rmdi  E2_S0_ME_wbw  E2_S0_ME_wio  E2_S0_ME_wlat  E2_S0_ME_wmdi     timestamp
0             0             0              0              0             0             0              0              0  1.628464e+09

现在对于合并线,让我们从一个具有您描述的值的数据框开始,这里有 2 组,一组是第 0-3 行,另一组是第 4-5 行,每个组最多有一个非 NaN 值列和合并行:

>>> df
      timestamp  E2_S0_ME_rbw  E2_S0_ME_rio  E2_S0_ME_rlat  E2_S0_ME_rmdi  E2_S0_ME_wbw  E2_S0_ME_wio  E2_S0_ME_wlat  E2_S0_ME_wmdi
0  1.628464e+09           NaN           NaN            NaN       0.886793      0.525714           NaN            NaN            NaN
1  1.628464e+09           NaN      0.638154       0.319839            NaN           NaN      0.375288            NaN            NaN
2  1.628464e+09           NaN           NaN            NaN            NaN           NaN           NaN       0.660108            NaN
3  1.628464e+09      0.969127           NaN            NaN            NaN           NaN           NaN            NaN       0.362666
4  1.628464e+09           NaN           NaN            NaN       0.879372           NaN           NaN       0.851226            NaN
5  1.628464e+09      0.029188      0.757706       0.718359            NaN      0.491337      0.239511            NaN       0.503021
>>> df['timestamp'].astype('datetime64[s]')
0   2021-08-08 22:59:35
1   2021-08-08 22:59:36
2   2021-08-08 22:59:36
3   2021-08-08 22:59:36
4   2021-08-08 22:59:36
5   2021-08-08 22:59:37
Name: timestamp, dtype: datetime64[ns]
>>> df['timestamp'].diff()
0    NaN
1    0.2
2    0.2
3    0.2
4    0.4
5    0.2
Name: timestamp, dtype: float64

您想合并彼此相差 0.3 秒以内的所有行,我们可以使用 diff() 进行检查,这意味着每次差异大于 0.3 秒时我们都会开始一个新组。使用 .first() 获取行中的第一个非 NA 值:

>>> df.groupby((df['timestamp'].diff().rename(None) > .3).cumsum()).first()
      timestamp  E2_S0_ME_rbw  E2_S0_ME_rio  E2_S0_ME_rlat  E2_S0_ME_rmdi  E2_S0_ME_wbw  E2_S0_ME_wio  E2_S0_ME_wlat  E2_S0_ME_wmdi
0  1.628464e+09      0.969127      0.638154       0.319839       0.886793      0.525714      0.375288       0.660108       0.362666
1  1.628464e+09      0.029188      0.757706       0.718359       0.879372      0.491337      0.239511       0.851226       0.503021

请注意,对于 .resample(),如果您的值接近但位于边界值的错误一侧,例如0.299s 和 0.301s,它们会聚合到不同的行。