在时间序列分析中批处理 tf.data.dataset

Batching in tf.data.dataset in time-series analysis

我正在考虑为时间序列 LSTM 模型创建管道。我有两个输入源,我们称它们为 series1series2.

我通过调用 from.tensor.slices:

来初始化 tf.data 对象
ds = tf.data.Dataset.from_tensor_slices((series1, series2))

我将它们进一步分批为 windows 组 windows 大小并在 windows:

之间移动 1
ds = ds.window(window_size + 1, shift=1, drop_remainder=True)

在这一点上,我想尝试一下如何将它们一起批处理。我想生成如下所示的特定输入作为示例:

series1 = [1, 2, 3, 4, 5]
series2 = [100, 200, 300, 400, 500]

batch 1: [1, 2, 100, 200]
batch 2: [2, 3, 200, 300]
batch 3: [3, 4, 300, 400]

因此每批将 return series1 的两个元素,然后是 series2 的两个元素。此代码片段 对它们分别进行批处理:

ds = ds.map(lambda s1, s2: (s1.batch(window_size + 1), s2.batch(window_size + 1))

因为它return是数据集对象的两个映射。因为它们是对象,所以它们是不可订阅的,所以这也不起作用:

ds = ds.map(lambda s1, s2: (s1[:2], s2[:2]))

我确定解决方案是通过自定义 lambda 函数使用 .apply。非常感谢任何帮助。

编辑

我也在考虑制作代表该系列下一个元素的标签。因此,例如,批次将产生以下内容:

batch 1: (tf.tensor([1, 2, 100, 200]), tf.tensor([3]))
batch 2: (tf.tensor([2, 3, 200, 300]), tf.tensor([4]))
batch 3: (tf.tensor([3, 4, 300, 400]), tf.tensor([5]))

其中 [3][4][5] 表示要预测的 series1 的下一个元素。

我认为这是你遗漏的行:

ds = ds.batch(2).map(lambda x, y: (tf.concat([x, y], axis=0)))

完整示例:

import tensorflow as tf

series1 = tf.range(1, 16)
series2 = tf.range(100, 1600, 100)

ds = tf.data.Dataset.from_tensor_slices((series1, series2))

ds = ds.batch(2).map(lambda x, y: (tf.concat([x, y], axis=0)))

for row in ds:
    print(row)
tf.Tensor([  1   2 100 200], shape=(4,), dtype=int32)
tf.Tensor([  3   4 300 400], shape=(4,), dtype=int32)
tf.Tensor([  5   6 500 600], shape=(4,), dtype=int32)
tf.Tensor([  7   8 700 800], shape=(4,), dtype=int32)
tf.Tensor([   9   10  900 1000], shape=(4,), dtype=int32)
tf.Tensor([  11   12 1100 1200], shape=(4,), dtype=int32)
tf.Tensor([  13   14 1300 1400], shape=(4,), dtype=int32)

解决方案是 window 两个数据集分别 .zip() 在一起,然后 .concat() 包含标签的元素。

ds = tf.data.Dataset.from_tensor_slices(series1)
ds = ds.window(window_size + 1, shift=1, drop_remainder=True)
ds = ds.flat_map(lambda window: window.batch(window_size + 1))
ds = ds.map(lambda window: (window[:-1], window[-1]))

ds2 = tf.data.Dataset.from_tensor_slices(series2)
ds2 = ds2.window(window_size, shift=1, drop_remainder=True)
ds2 = ds2.flat_map(lambda window: window.batch(window_size))

ds = tf.data.Dataset.zip((ds, ds2))
ds = ds.map(lambda i, j: (tf.concat([i[0], j], axis=0), i[-1]))

Returns:

(<tf.Tensor: shape=(7,), dtype=int32, numpy=array([  1,   2,   3, 100, 200, 300])>, <tf.Tensor: shape=(), dtype=int32, numpy=4>)
(<tf.Tensor: shape=(7,), dtype=int32, numpy=array([  2,   3,   4, 200, 300, 400])>, <tf.Tensor: shape=(), dtype=int32, numpy=5>)
(<tf.Tensor: shape=(7,), dtype=int32, numpy=array([  3,   4,   5, 300, 400, 500])>, <tf.Tensor: shape=(), dtype=int32, numpy=6>)

这是我在处理时间序列数据时的解决方案。

dataset = tf.data.Dataset.from_tensor_slices(series)
dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
dataset = dataset.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1]))
dataset = dataset.batch(batch_size).prefetch(1)

下一行对于将 window 拆分为 xs 和 ys 很重要。

dataset.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1]))

虽然用shuffle不重要,但是只能用map函数把window拆分成xs和ys