如何使用 Tensorflow make_csv_dataset 为 Keras 模型配置数据集管道
How to configure dataset pipelines with Tensorflow make_csv_dataset for Keras Model
我有一个大约 200 GB 的结构化数据集(csv 特征文件)。我正在使用 make_csv_dataset 来制作输入管道。这是我的代码
def pack_features_vector(features, labels):
"""Pack the features into a single array."""
features = tf.stack(list(features.values()), axis=1)
return features, labels
def main():
defaults=[float()]*len(selected_columns)
data_set=tf.data.experimental.make_csv_dataset(
file_pattern = "./../path-to-dataset/Train_DS/*/*.csv",
column_names=all_columns, # all_columns=["col1,col2,..."]
select_columns=selected_columns, # selected_columns= a subset of all_columns
column_defaults=defaults,
label_name="Target",
batch_size=1000,
num_epochs=20,
num_parallel_reads=50,
# shuffle_buffer_size=10000,
ignore_errors=True)
data_set = data_set.map(pack_features_vector)
N_VALIDATION = int(1e3)
N_TRAIN= int(1e4)
BUFFER_SIZE = int(1e4)
BATCH_SIZE = 1000
STEPS_PER_EPOCH = N_TRAIN//BATCH_SIZE
validate_ds = data_set.take(N_VALIDATION).cache().repeat()
train_ds = data_set.skip(N_VALIDATION).take(N_TRAIN).cache().repeat()
# validate_ds = validate_ds.batch(BATCH_SIZE)
# train_ds = train_ds.batch(BATCH_SIZE)
model = tf.keras.Sequential([
layers.Flatten(),
layers.Dense(256, activation='elu'),
layers.Dense(256, activation='elu'),
layers.Dense(128, activation='elu'),
layers.Dense(64, activation='elu'),
layers.Dense(32, activation='elu'),
layers.Dense(1,activation='sigmoid')
])
model.compile(optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=1,
steps_per_epoch= 1,
epochs=20,
verbose=1
)
if __name__ == "__main__":
main()
print('Training completed!')
现在,当我执行此代码时,它会在几分钟内完成(我认为不会遍历整个训练数据)并出现以下警告:
W tensorflow/core/kernels/data/cache_dataset_ops.cc:798] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to dataset.cache().take(k).repeat()
. You should use dataset.take(k).cache().repeat()
instead.
根据此警告,由于训练在几分钟内完成,这意味着...输入管道配置不正确...任何人都可以指导我如何解决此问题。
我系统的 GPU 是 NVIDIA Quadro RTX 6000(计算能力 7.5)。
基于其他功能的解决方案,如 experimental.CsvDataset
也可以。
编辑
通过更改代码以避免任何缓存,警告消失了
validate_ds = data_set.take(N_VALIDATION).repeat()
train_ds = data_set.skip(N_VALIDATION).take(N_TRAIN).repeat()
但现在的问题是我的准确性为零,即使是在训练数据上也是如此。我认为这是输入管道的问题。这是输出。
编辑2
经过一些努力,我设法使用较低级别但类似 API、CsvDataset 解决了已知问题。但是现在,我得到的是 accuracy=1.00,我认为这是不对的。在第一个时期,它是 0.95,然后在接下来的 19 个时期,它是 1.00。这是我的最终代码。
def preprocess(*fields):
features=tf.stack(fields[:-1])
# convert Target column values to int to make it work for binary classification
labels=tf.stack([int(x) for x in fields[-1:]])
return features,labels # x, y
def main():
# selected_columns=["col1,col2,..."]
selected_indices=[]
for selected_column in selected_columns:
index=all_columns.index(selected_column)
selected_indices.append(index)
print("All_columns length"+str(len(all_columns)))
print("selected_columns length"+str(len(selected_columns)))
print("selected_indices length"+str(len(selected_indices)))
print(selected_indices)
defaults=[float()]*(len(selected_columns))
#defaults.append(int())
print("defaults"+str(defaults))
print("defaults length"+str(len(defaults)))
FEATURES = len(selected_columns) - 1
training_csvs = sorted(str(p) for p in pathlib.Path('.').glob("path-to-data/Train_DS/*/*.csv"))
testing_csvs = sorted(str(p) for p in pathlib.Path('.').glob("path-to-data/Test_DS/*/*.csv"))
training_csvs
testing_csvs
training_dataset=tf.data.experimental.CsvDataset(
training_csvs,
record_defaults=defaults,
compression_type=None,
buffer_size=None,
header=True,
field_delim=',',
# use_quote_delim=True,
# na_value="",
select_cols=selected_indices
)
print(type(training_dataset))
for features in training_dataset.take(1):
print("Training samples before mapping")
print(features)
validate_ds = training_dataset.map(preprocess).take(10).batch(100).repeat()
train_ds = training_dataset.map(preprocess).skip(10).take(90).batch(100).repeat()
validate_ds
train_ds
for features,labels in train_ds.take(1):
print("Training samples")
print(features)
print(labels)
testing_dataset=tf.data.experimental.CsvDataset(
testing_csvs,
record_defaults=defaults,
compression_type=None,
buffer_size=None,
header=True,
field_delim=',',
use_quote_delim=True,
na_value="",
select_cols=selected_indices
)
print(type(testing_dataset))
test_ds = testing_dataset.map(preprocess).batch(100).repeat()
test_ds
for features,labels in test_ds.take(1):
print("Testing samples")
print(features)
print(labels)
model = tf.keras.Sequential([
layers.Dense(256,activation='elu'),
layers.Dense(128,activation='elu'),
layers.Dense(64,activation='elu'),
layers.Dense(1,activation='sigmoid')
])
history = model.compile(optimizer='adam', loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
metrics=['accuracy'])
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=20,
steps_per_epoch= 20,
epochs=20,
verbose=1
)
loss, accuracy = model.evaluate(test_ds)
print("Test Accuracy", accuracy)
if __name__ == "__main__":
main()
print('Training completed!')
我试图只向模型提供一些无用的特征,但它仍然给出准确度=1.00 或 100%。现在哪里出了问题?过度拟合等?
在代码片段中,您写了
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=1,
steps_per_epoch= 1,
epochs=20,
verbose=1)
steps_per_epoch= 1
是错字吗?如果不是,那将意味着您每次训练只使用一批,这解释了快速训练和低准确度。 validation_steps=1
也是一个问题
我有一个大约 200 GB 的结构化数据集(csv 特征文件)。我正在使用 make_csv_dataset 来制作输入管道。这是我的代码
def pack_features_vector(features, labels):
"""Pack the features into a single array."""
features = tf.stack(list(features.values()), axis=1)
return features, labels
def main():
defaults=[float()]*len(selected_columns)
data_set=tf.data.experimental.make_csv_dataset(
file_pattern = "./../path-to-dataset/Train_DS/*/*.csv",
column_names=all_columns, # all_columns=["col1,col2,..."]
select_columns=selected_columns, # selected_columns= a subset of all_columns
column_defaults=defaults,
label_name="Target",
batch_size=1000,
num_epochs=20,
num_parallel_reads=50,
# shuffle_buffer_size=10000,
ignore_errors=True)
data_set = data_set.map(pack_features_vector)
N_VALIDATION = int(1e3)
N_TRAIN= int(1e4)
BUFFER_SIZE = int(1e4)
BATCH_SIZE = 1000
STEPS_PER_EPOCH = N_TRAIN//BATCH_SIZE
validate_ds = data_set.take(N_VALIDATION).cache().repeat()
train_ds = data_set.skip(N_VALIDATION).take(N_TRAIN).cache().repeat()
# validate_ds = validate_ds.batch(BATCH_SIZE)
# train_ds = train_ds.batch(BATCH_SIZE)
model = tf.keras.Sequential([
layers.Flatten(),
layers.Dense(256, activation='elu'),
layers.Dense(256, activation='elu'),
layers.Dense(128, activation='elu'),
layers.Dense(64, activation='elu'),
layers.Dense(32, activation='elu'),
layers.Dense(1,activation='sigmoid')
])
model.compile(optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=1,
steps_per_epoch= 1,
epochs=20,
verbose=1
)
if __name__ == "__main__":
main()
print('Training completed!')
现在,当我执行此代码时,它会在几分钟内完成(我认为不会遍历整个训练数据)并出现以下警告:
W tensorflow/core/kernels/data/cache_dataset_ops.cc:798] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset will be discarded. This can happen if you have an input pipeline similar to
dataset.cache().take(k).repeat()
. You should usedataset.take(k).cache().repeat()
instead.
根据此警告,由于训练在几分钟内完成,这意味着...输入管道配置不正确...任何人都可以指导我如何解决此问题。
我系统的 GPU 是 NVIDIA Quadro RTX 6000(计算能力 7.5)。
基于其他功能的解决方案,如 experimental.CsvDataset
也可以。
编辑
通过更改代码以避免任何缓存,警告消失了
validate_ds = data_set.take(N_VALIDATION).repeat()
train_ds = data_set.skip(N_VALIDATION).take(N_TRAIN).repeat()
但现在的问题是我的准确性为零,即使是在训练数据上也是如此。我认为这是输入管道的问题。这是输出。
编辑2
经过一些努力,我设法使用较低级别但类似 API、CsvDataset 解决了已知问题。但是现在,我得到的是 accuracy=1.00,我认为这是不对的。在第一个时期,它是 0.95,然后在接下来的 19 个时期,它是 1.00。这是我的最终代码。
def preprocess(*fields):
features=tf.stack(fields[:-1])
# convert Target column values to int to make it work for binary classification
labels=tf.stack([int(x) for x in fields[-1:]])
return features,labels # x, y
def main():
# selected_columns=["col1,col2,..."]
selected_indices=[]
for selected_column in selected_columns:
index=all_columns.index(selected_column)
selected_indices.append(index)
print("All_columns length"+str(len(all_columns)))
print("selected_columns length"+str(len(selected_columns)))
print("selected_indices length"+str(len(selected_indices)))
print(selected_indices)
defaults=[float()]*(len(selected_columns))
#defaults.append(int())
print("defaults"+str(defaults))
print("defaults length"+str(len(defaults)))
FEATURES = len(selected_columns) - 1
training_csvs = sorted(str(p) for p in pathlib.Path('.').glob("path-to-data/Train_DS/*/*.csv"))
testing_csvs = sorted(str(p) for p in pathlib.Path('.').glob("path-to-data/Test_DS/*/*.csv"))
training_csvs
testing_csvs
training_dataset=tf.data.experimental.CsvDataset(
training_csvs,
record_defaults=defaults,
compression_type=None,
buffer_size=None,
header=True,
field_delim=',',
# use_quote_delim=True,
# na_value="",
select_cols=selected_indices
)
print(type(training_dataset))
for features in training_dataset.take(1):
print("Training samples before mapping")
print(features)
validate_ds = training_dataset.map(preprocess).take(10).batch(100).repeat()
train_ds = training_dataset.map(preprocess).skip(10).take(90).batch(100).repeat()
validate_ds
train_ds
for features,labels in train_ds.take(1):
print("Training samples")
print(features)
print(labels)
testing_dataset=tf.data.experimental.CsvDataset(
testing_csvs,
record_defaults=defaults,
compression_type=None,
buffer_size=None,
header=True,
field_delim=',',
use_quote_delim=True,
na_value="",
select_cols=selected_indices
)
print(type(testing_dataset))
test_ds = testing_dataset.map(preprocess).batch(100).repeat()
test_ds
for features,labels in test_ds.take(1):
print("Testing samples")
print(features)
print(labels)
model = tf.keras.Sequential([
layers.Dense(256,activation='elu'),
layers.Dense(128,activation='elu'),
layers.Dense(64,activation='elu'),
layers.Dense(1,activation='sigmoid')
])
history = model.compile(optimizer='adam', loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
metrics=['accuracy'])
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=20,
steps_per_epoch= 20,
epochs=20,
verbose=1
)
loss, accuracy = model.evaluate(test_ds)
print("Test Accuracy", accuracy)
if __name__ == "__main__":
main()
print('Training completed!')
我试图只向模型提供一些无用的特征,但它仍然给出准确度=1.00 或 100%。现在哪里出了问题?过度拟合等?
在代码片段中,您写了
model.fit(train_ds,
validation_data=validate_ds,
validation_steps=1,
steps_per_epoch= 1,
epochs=20,
verbose=1)
steps_per_epoch= 1
是错字吗?如果不是,那将意味着您每次训练只使用一批,这解释了快速训练和低准确度。 validation_steps=1
也是一个问题