迭代 Tensorflow 数据集 returns 总是一个不同排序的数组
Iterating on Tensorfow Dataset returns always a differently sorted array
假设您有一个包含值和标签的张量流数据集。在我的例子中,我从一个时间序列创建它:
f = pd.read_csv('MY.csv', index_col=0, parse_dates=True)
#extract the column we are interested in
single_col = df[['Close']]
#Convert to TFDataset
WINDOW_SIZE = 10
dataset = tf.data.Dataset.from_tensor_slices((single_col_df.values))
d = dataset.window(WINDOW_SIZE, shift=1, drop_remainder=True)
d2 = d.flat_map(lambda window: window.batch(WINDOW_SIZE+1))
#create data and ground truth
d3 = d2.map(lambda window: (window[:-1], window[-1:]))
#get the total data and shuffle
len_ds = 0
for item in d2:
len_ds +=1
d_shuffled = d3.shuffle(buffer_size=len_ds)
# split train/test
train_size = int(0.7 * len_ds)
val_size = int(0.15 * len_ds)
test_size = int(0.15 * len_ds)
train_dataset = d_shuffled.take(train_size)
test_dataset = d_shuffled.skip(train_size)
val_dataset = test_dataset.skip(test_size)
test_dataset = test_dataset.take(test_size)
train_dataset = train_dataset.batch(32).prefetch(2)
val_dataset = val_dataset.batch(32)
现在出于评估目的,我想获得测试的基本真实值,所以我 运行
y = np.concatenate([y for x, y in test_dataset], axis=0)
但是每次返回的都是不同排序的数组,因此无法与模型预测的模型进行比较。例如,当 运行 jupyter notebook 中的上述行并将 y
的前 5 个值打印为 `y[:5] 时,有一次我得到
array([[26.04000092],
[16.39999962],
[18.98999977],
[42.31000137],
[19.82999992]])
另一个我得到
array([[15.86999989],
[43.27999878],
[19.32999992],
[48.38000107],
[17.12000084]])
但 y
的长度保持不变,所以我假设元素只是随机排列。无论如何,我无法将这些值与预测值进行比较,因为它们的顺序不同:
y_hat = model.predict(test_dataset)
此外,我也得到了不同的评估结果。例如,
x = []
y = []
for _x,_y in test_dataset:
x.append(_x)
y.append(_y)
x = np.array(x)
y = np.array(y)
model.evaluate(x=x, y=y)
每次重新执行定义数组 x
和 y
的循环时,我都会得到不同的 x
和 y
数组,从而导致不同的评估结果.
您的问题:
通过在拆分之前对 整个 数据集调用 shuffle
,实际上是在每次耗尽数据集后重新洗牌数据集。这是正在发生的事情:
- 第一次调用
y = np.concatenate([y for x, y in test_dataset], axis=0)
会耗尽测试数据集
- 第二次调用
y = np.concatenate([y for x, y in test_dataset], axis=0)
会看到test_dataset耗尽,会触发:
- 整个数据集的重组
- 调用 skip 以获得正确大小的数据集
你最终在第二轮的测试数据集中得到了第一次用尽的训练数据集的潜在样本。
解决方案
如果我们查看 tf.data.Dataset.suffle
的文档:
reshuffle_each_iteration (Optional.) A boolean, which if true indicates that the dataset should be pseudorandomly reshuffled each time it is iterated over. (Defaults to True.)
将其设置为 false 以进行确定性随机播放。如果您仍想在每个 epoch 对训练集进行洗牌,则需要对训练集调用 shuffle。
一个虚拟的例子:
import tensorflow as tf
tf.random.set_seed(0) # reproducibility
a = tf.range(10)
ds = tf.data.Dataset.from_tensor_slices(a)
ds_shuffled = ds.shuffle(10,reshuffle_each_iteration=False)
ds_train = ds_shuffled.take(7)
ds_train = ds_train.shuffle(7)
ds_test = ds_shuffled.skip(7)
运行 :
>>> [x.numpy() for x in ds_test]
[5, 8, 4]
>>> [x.numpy() for x in ds_test]
[5, 8, 4]
>>> [x.numpy() for x in ds_train]
[1, 3, 7, 2, 6, 9, 0]
>>> [x.numpy() for x in ds_train]
[3, 9, 6, 7, 2, 1, 0]
用 reshuffle_each_iteration=True
尝试 运行 看看在你自己的代码中发生了什么
除了 Lescurel 的回答之外,另一个可行的解决方案似乎是来自 Kaggle 的这段代码,它使用 sklearn
:
from sklearn.model_selection import train_test_split
# Extract target values from the vanilla training dataset.
# Indices are generated along with the target values, which are used to filter dataset.
y_targets = np.array([ target.numpy() for _, target in iter(d_shuffled) ])
X_indices = np.arange(len(y_targets))
y_targets = y_targets.reshape((-1,))
y_targets.shape
#stratify array-like, default=None If not None, data is split in a stratified fashion, using this as the class labels.
X_train_indices, X_val_indices, y_train_targets, y_val_targets = train_test_split(
X_indices, y_targets, test_size=0.15, stratify=None, random_state=53)
X_test_indices, X_val_indices, y_test_targets, y_val_targets = train_test_split(
X_val_indices, y_val_targets, test_size=0.5, stratify=None, random_state=53)
def get_selected_dataset(ds, X_indices_np):
# Make a tensor of type tf.int64 to match the one by Dataset.enumerate().
X_indices_ts = tf.constant(X_indices_np, dtype=tf.int64)
def is_index_in(index, rest):
# Returns True if the specified index value is included in X_indices_ts.
#
# '==' compares the specified index value with each values in X_indices_ts.
# The result is a boolean tensor, looks like [ False, True, ..., False ].
# reduce_any() returns Ture if True is included in the specified tensor.
return tf.math.reduce_any(index == X_indices_ts)
def drop_index(index, rest):
return rest
# Dataset.enumerate() is similter to Python's enumerate().
# The method adds indices to each elements. Then, the elements are filtered
# by using the specified indices. Finally unnecessary indices are dropped.
selected_ds = ds \
.enumerate() \
.filter(is_index_in) \
.map(drop_index)
return selected_ds
假设您有一个包含值和标签的张量流数据集。在我的例子中,我从一个时间序列创建它:
f = pd.read_csv('MY.csv', index_col=0, parse_dates=True)
#extract the column we are interested in
single_col = df[['Close']]
#Convert to TFDataset
WINDOW_SIZE = 10
dataset = tf.data.Dataset.from_tensor_slices((single_col_df.values))
d = dataset.window(WINDOW_SIZE, shift=1, drop_remainder=True)
d2 = d.flat_map(lambda window: window.batch(WINDOW_SIZE+1))
#create data and ground truth
d3 = d2.map(lambda window: (window[:-1], window[-1:]))
#get the total data and shuffle
len_ds = 0
for item in d2:
len_ds +=1
d_shuffled = d3.shuffle(buffer_size=len_ds)
# split train/test
train_size = int(0.7 * len_ds)
val_size = int(0.15 * len_ds)
test_size = int(0.15 * len_ds)
train_dataset = d_shuffled.take(train_size)
test_dataset = d_shuffled.skip(train_size)
val_dataset = test_dataset.skip(test_size)
test_dataset = test_dataset.take(test_size)
train_dataset = train_dataset.batch(32).prefetch(2)
val_dataset = val_dataset.batch(32)
现在出于评估目的,我想获得测试的基本真实值,所以我 运行
y = np.concatenate([y for x, y in test_dataset], axis=0)
但是每次返回的都是不同排序的数组,因此无法与模型预测的模型进行比较。例如,当 运行 jupyter notebook 中的上述行并将 y
的前 5 个值打印为 `y[:5] 时,有一次我得到
array([[26.04000092],
[16.39999962],
[18.98999977],
[42.31000137],
[19.82999992]])
另一个我得到
array([[15.86999989],
[43.27999878],
[19.32999992],
[48.38000107],
[17.12000084]])
但 y
的长度保持不变,所以我假设元素只是随机排列。无论如何,我无法将这些值与预测值进行比较,因为它们的顺序不同:
y_hat = model.predict(test_dataset)
此外,我也得到了不同的评估结果。例如,
x = []
y = []
for _x,_y in test_dataset:
x.append(_x)
y.append(_y)
x = np.array(x)
y = np.array(y)
model.evaluate(x=x, y=y)
每次重新执行定义数组 x
和 y
的循环时,我都会得到不同的 x
和 y
数组,从而导致不同的评估结果.
您的问题:
通过在拆分之前对 整个 数据集调用 shuffle
,实际上是在每次耗尽数据集后重新洗牌数据集。这是正在发生的事情:
- 第一次调用
y = np.concatenate([y for x, y in test_dataset], axis=0)
会耗尽测试数据集 - 第二次调用
y = np.concatenate([y for x, y in test_dataset], axis=0)
会看到test_dataset耗尽,会触发:- 整个数据集的重组
- 调用 skip 以获得正确大小的数据集
你最终在第二轮的测试数据集中得到了第一次用尽的训练数据集的潜在样本。
解决方案
如果我们查看 tf.data.Dataset.suffle
的文档:
reshuffle_each_iteration (Optional.) A boolean, which if true indicates that the dataset should be pseudorandomly reshuffled each time it is iterated over. (Defaults to True.)
将其设置为 false 以进行确定性随机播放。如果您仍想在每个 epoch 对训练集进行洗牌,则需要对训练集调用 shuffle。
一个虚拟的例子:
import tensorflow as tf
tf.random.set_seed(0) # reproducibility
a = tf.range(10)
ds = tf.data.Dataset.from_tensor_slices(a)
ds_shuffled = ds.shuffle(10,reshuffle_each_iteration=False)
ds_train = ds_shuffled.take(7)
ds_train = ds_train.shuffle(7)
ds_test = ds_shuffled.skip(7)
运行 :
>>> [x.numpy() for x in ds_test]
[5, 8, 4]
>>> [x.numpy() for x in ds_test]
[5, 8, 4]
>>> [x.numpy() for x in ds_train]
[1, 3, 7, 2, 6, 9, 0]
>>> [x.numpy() for x in ds_train]
[3, 9, 6, 7, 2, 1, 0]
用 reshuffle_each_iteration=True
尝试 运行 看看在你自己的代码中发生了什么
除了 Lescurel 的回答之外,另一个可行的解决方案似乎是来自 Kaggle 的这段代码,它使用 sklearn
:
from sklearn.model_selection import train_test_split
# Extract target values from the vanilla training dataset.
# Indices are generated along with the target values, which are used to filter dataset.
y_targets = np.array([ target.numpy() for _, target in iter(d_shuffled) ])
X_indices = np.arange(len(y_targets))
y_targets = y_targets.reshape((-1,))
y_targets.shape
#stratify array-like, default=None If not None, data is split in a stratified fashion, using this as the class labels.
X_train_indices, X_val_indices, y_train_targets, y_val_targets = train_test_split(
X_indices, y_targets, test_size=0.15, stratify=None, random_state=53)
X_test_indices, X_val_indices, y_test_targets, y_val_targets = train_test_split(
X_val_indices, y_val_targets, test_size=0.5, stratify=None, random_state=53)
def get_selected_dataset(ds, X_indices_np):
# Make a tensor of type tf.int64 to match the one by Dataset.enumerate().
X_indices_ts = tf.constant(X_indices_np, dtype=tf.int64)
def is_index_in(index, rest):
# Returns True if the specified index value is included in X_indices_ts.
#
# '==' compares the specified index value with each values in X_indices_ts.
# The result is a boolean tensor, looks like [ False, True, ..., False ].
# reduce_any() returns Ture if True is included in the specified tensor.
return tf.math.reduce_any(index == X_indices_ts)
def drop_index(index, rest):
return rest
# Dataset.enumerate() is similter to Python's enumerate().
# The method adds indices to each elements. Then, the elements are filtered
# by using the specified indices. Finally unnecessary indices are dropped.
selected_ds = ds \
.enumerate() \
.filter(is_index_in) \
.map(drop_index)
return selected_ds