如何为 Keras/tf.Keras 构建自定义数据生成器,其中 X 图像被增强并且相应的 Y 标签也是图像
How to build a Custom Data Generator for Keras/tf.Keras where X images are being augmented and corresponding Y labels are also images
我正在使用 UNet 进行图像二值化,并且有一个包含 150 张图像及其二值化版本的数据集。我的想法是随机增强图像,使它们看起来不同,因此我制作了一个函数,可以将 4-5 种类型的噪声、偏度、剪切等中的任何一种插入到图像中。我本可以轻松使用
ImageDataGenerator(preprocess_function=my_aug_function)
来增强图像,但问题是我的 y 目标 也是图像。另外,我可以使用类似的东西:
train_dataset = (
train_dataset.map(
encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
)
.batch(batch_size)
.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)
但是它有两个问题:
- 对于更大的数据集,它会炸毁内存,因为数据需要已经在内存中
- 这是关键部分,我需要在旅途中增强图像,使它看起来像我有一个巨大的数据集。
另一种解决方案可能是将增强图像保存到一个目录中并将它们设为 30-40K,然后加载它们。这样做会很愚蠢。
现在的想法是我可以使用 Sequence
作为父级 class 但是我怎样才能继续使用相应的 Y 二值化图像动态地增加和生成新图像?
我有一个想法,如下代码。有人可以帮我增强和生成 y 图像吗?我有 X_DIR, Y_DIR
,其中二进制图像名称和原始图像名称相同,但存储在不同的目录中。
class DataGenerator(tensorflow.keras.utils.Sequence):
def __init__(self, files_path, labels_path, batch_size=32, shuffle=True, random_state=42):
'Initialization'
self.files = files_path
self.labels = labels_path
self.batch_size = batch_size
self.shuffle = shuffle
self.random_state = random_state
self.on_epoch_end()
def on_epoch_end(self):
'Updates indexes after each epoch'
# Shuffle the data here
def __len__(self):
return int(np.floor(len(self.files) / self.batch_size))
def __getitem__(self, index):
# What do I do here?
def __data_generation(self, files):
# I think this is responsible for Augmentation but no idea how should I implement it and how does it works.
您可以使用 albumentations 和 imgaug 等库,两者都很好,但我听说随机种子与 albumentations 存在问题。
这是取自文档 here:
的 imgaug 示例
seq = iaa.Sequential([
iaa.Dropout([0.05, 0.2]), # drop 5% or 20% of all pixels
iaa.Sharpen((0.0, 1.0)), # sharpen the image
iaa.Affine(rotate=(-45, 45)), # rotate by -45 to 45 degrees (affects segmaps)
iaa.ElasticTransformation(alpha=50, sigma=5) # apply water effect (affects segmaps)
], random_order=True)
# Augment images and segmaps.
images_aug = []
segmaps_aug = []
for _ in range(len(input_data)):
images_aug_i, segmaps_aug_i = seq(image=image, segmentation_maps=segmap)
images_aug.append(images_aug_i)
segmaps_aug.append(segmaps_aug_i)
您使用自定义生成器的方式正确。在 __getitem__
中,使用 batch_x = self.files[index:index+batch_size]
进行批处理,与 batch_y
相同,然后使用 X,y = __data_generation(batch_x, batch_y)
增加它们,这将加载图像(使用你喜欢的任何库,我更喜欢 opencv),和 return 增强对(以及任何其他操作)。
你的 __getitem__
然后 return 元组 (X,y)
即使您的标签是图像,您也可以使用 ImageDataGenerator。
这是一个简单的例子,说明如何做到这一点:
代码:
# Specifying your data augmentation here for both image and label
image_datagen = tf.keras.preprocessing.image.ImageDataGenerator()
mask_datagen = tf.keras.preprocessing.image.ImageDataGenerator()
# Provide the same seed and keyword arguments to the flow methods
seed = 1
image_generator = image_datagen.flow_from_directory(
data_dir,
class_mode=None,
seed=seed)
mask_generator = mask_datagen.flow_from_directory(
data_dir,
class_mode=None,
seed=seed)
# Combine the image and label generator.
train_generator = zip(image_generator, mask_generator)
现在,如果你迭代它,你会得到:
for image, label in train_generator:
print(image.shape,label.shape)
break
输出:
(32, 256, 256, 3) (32, 256, 256, 3)
您可以将此 train_generator 与 fit()
命令一起使用。
代码:
model.fit_generator(
train_generator,
steps_per_epoch=2000,
epochs=50)
有了 flow_from_directory
,您的记忆将不会混乱,Imagedatagenerator
将负责扩充部分。
自定义图像数据生成器
将目录数据加载到 CustomDataGenerator 的数据框中
def data_to_df(data_dir, subset=None, validation_split=None):
df = pd.DataFrame()
filenames = []
labels = []
for dataset in os.listdir(data_dir):
img_list = os.listdir(os.path.join(data_dir, dataset))
label = name_to_idx[dataset]
for image in img_list:
filenames.append(os.path.join(data_dir, dataset, image))
labels.append(label)
df["filenames"] = filenames
df["labels"] = labels
if subset == "train":
split_indexes = int(len(df) * validation_split)
train_df = df[split_indexes:]
val_df = df[:split_indexes]
return train_df, val_df
return df
train_df, val_df = data_to_df(train_dir, subset="train", validation_split=0.2)
自定义数据生成器
import tensorflow as tf
from PIL import Image
import numpy as np
class CustomDataGenerator(tf.keras.utils.Sequence):
''' Custom DataGenerator to load img
Arguments:
data_frame = pandas data frame in filenames and labels format
batch_size = divide data in batches
shuffle = shuffle data before loading
img_shape = image shape in (h, w, d) format
augmentation = data augmentation to make model rebust to overfitting
Output:
Img: numpy array of image
label : output label for image
'''
def __init__(self, data_frame, batch_size=10, img_shape=None, augmentation=True, num_classes=None):
self.data_frame = data_frame
self.train_len = len(data_frame)
self.batch_size = batch_size
self.img_shape = img_shape
self.num_classes = num_classes
print(f"Found {self.data_frame.shape[0]} images belonging to {self.num_classes} classes")
def __len__(self):
''' return total number of batches '''
self.data_frame = shuffle(self.data_frame)
return math.ceil(self.train_len/self.batch_size)
def on_epoch_end(self):
''' shuffle data after every epoch '''
# fix on epoch end it's not working, adding shuffle in len for alternative
pass
def __data_augmentation(self, img):
''' function for apply some data augmentation '''
img = tf.keras.preprocessing.image.random_shift(img, 0.2, 0.3)
img = tf.image.random_flip_left_right(img)
img = tf.image.random_flip_up_down(img)
return img
def __get_image(self, file_id):
""" open image with file_id path and apply data augmentation """
img = np.asarray(Image.open(file_id))
img = np.resize(img, self.img_shape)
img = self.__data_augmentation(img)
img = preprocess_input(img)
return img
def __get_label(self, label_id):
""" uncomment the below line to convert label into categorical format """
#label_id = tf.keras.utils.to_categorical(label_id, num_classes)
return label_id
def __getitem__(self, idx):
batch_x = self.data_frame["filenames"][idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.data_frame["labels"][idx * self.batch_size:(idx + 1) * self.batch_size]
# read your data here using the batch lists, batch_x and batch_y
x = [self.__get_image(file_id) for file_id in batch_x]
y = [self.__get_label(label_id) for label_id in batch_y]
return tf.convert_to_tensor(x), tf.convert_to_tensor(y)
我正在使用 UNet 进行图像二值化,并且有一个包含 150 张图像及其二值化版本的数据集。我的想法是随机增强图像,使它们看起来不同,因此我制作了一个函数,可以将 4-5 种类型的噪声、偏度、剪切等中的任何一种插入到图像中。我本可以轻松使用
ImageDataGenerator(preprocess_function=my_aug_function)
来增强图像,但问题是我的 y 目标 也是图像。另外,我可以使用类似的东西:
train_dataset = (
train_dataset.map(
encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
)
.batch(batch_size)
.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)
但是它有两个问题:
- 对于更大的数据集,它会炸毁内存,因为数据需要已经在内存中
- 这是关键部分,我需要在旅途中增强图像,使它看起来像我有一个巨大的数据集。
另一种解决方案可能是将增强图像保存到一个目录中并将它们设为 30-40K,然后加载它们。这样做会很愚蠢。
现在的想法是我可以使用 Sequence
作为父级 class 但是我怎样才能继续使用相应的 Y 二值化图像动态地增加和生成新图像?
我有一个想法,如下代码。有人可以帮我增强和生成 y 图像吗?我有 X_DIR, Y_DIR
,其中二进制图像名称和原始图像名称相同,但存储在不同的目录中。
class DataGenerator(tensorflow.keras.utils.Sequence):
def __init__(self, files_path, labels_path, batch_size=32, shuffle=True, random_state=42):
'Initialization'
self.files = files_path
self.labels = labels_path
self.batch_size = batch_size
self.shuffle = shuffle
self.random_state = random_state
self.on_epoch_end()
def on_epoch_end(self):
'Updates indexes after each epoch'
# Shuffle the data here
def __len__(self):
return int(np.floor(len(self.files) / self.batch_size))
def __getitem__(self, index):
# What do I do here?
def __data_generation(self, files):
# I think this is responsible for Augmentation but no idea how should I implement it and how does it works.
您可以使用 albumentations 和 imgaug 等库,两者都很好,但我听说随机种子与 albumentations 存在问题。 这是取自文档 here:
的 imgaug 示例seq = iaa.Sequential([
iaa.Dropout([0.05, 0.2]), # drop 5% or 20% of all pixels
iaa.Sharpen((0.0, 1.0)), # sharpen the image
iaa.Affine(rotate=(-45, 45)), # rotate by -45 to 45 degrees (affects segmaps)
iaa.ElasticTransformation(alpha=50, sigma=5) # apply water effect (affects segmaps)
], random_order=True)
# Augment images and segmaps.
images_aug = []
segmaps_aug = []
for _ in range(len(input_data)):
images_aug_i, segmaps_aug_i = seq(image=image, segmentation_maps=segmap)
images_aug.append(images_aug_i)
segmaps_aug.append(segmaps_aug_i)
您使用自定义生成器的方式正确。在 __getitem__
中,使用 batch_x = self.files[index:index+batch_size]
进行批处理,与 batch_y
相同,然后使用 X,y = __data_generation(batch_x, batch_y)
增加它们,这将加载图像(使用你喜欢的任何库,我更喜欢 opencv),和 return 增强对(以及任何其他操作)。
你的 __getitem__
然后 return 元组 (X,y)
即使您的标签是图像,您也可以使用 ImageDataGenerator。 这是一个简单的例子,说明如何做到这一点:
代码:
# Specifying your data augmentation here for both image and label
image_datagen = tf.keras.preprocessing.image.ImageDataGenerator()
mask_datagen = tf.keras.preprocessing.image.ImageDataGenerator()
# Provide the same seed and keyword arguments to the flow methods
seed = 1
image_generator = image_datagen.flow_from_directory(
data_dir,
class_mode=None,
seed=seed)
mask_generator = mask_datagen.flow_from_directory(
data_dir,
class_mode=None,
seed=seed)
# Combine the image and label generator.
train_generator = zip(image_generator, mask_generator)
现在,如果你迭代它,你会得到:
for image, label in train_generator:
print(image.shape,label.shape)
break
输出:
(32, 256, 256, 3) (32, 256, 256, 3)
您可以将此 train_generator 与 fit()
命令一起使用。
代码:
model.fit_generator(
train_generator,
steps_per_epoch=2000,
epochs=50)
有了 flow_from_directory
,您的记忆将不会混乱,Imagedatagenerator
将负责扩充部分。
自定义图像数据生成器
将目录数据加载到 CustomDataGenerator 的数据框中
def data_to_df(data_dir, subset=None, validation_split=None):
df = pd.DataFrame()
filenames = []
labels = []
for dataset in os.listdir(data_dir):
img_list = os.listdir(os.path.join(data_dir, dataset))
label = name_to_idx[dataset]
for image in img_list:
filenames.append(os.path.join(data_dir, dataset, image))
labels.append(label)
df["filenames"] = filenames
df["labels"] = labels
if subset == "train":
split_indexes = int(len(df) * validation_split)
train_df = df[split_indexes:]
val_df = df[:split_indexes]
return train_df, val_df
return df
train_df, val_df = data_to_df(train_dir, subset="train", validation_split=0.2)
自定义数据生成器
import tensorflow as tf
from PIL import Image
import numpy as np
class CustomDataGenerator(tf.keras.utils.Sequence):
''' Custom DataGenerator to load img
Arguments:
data_frame = pandas data frame in filenames and labels format
batch_size = divide data in batches
shuffle = shuffle data before loading
img_shape = image shape in (h, w, d) format
augmentation = data augmentation to make model rebust to overfitting
Output:
Img: numpy array of image
label : output label for image
'''
def __init__(self, data_frame, batch_size=10, img_shape=None, augmentation=True, num_classes=None):
self.data_frame = data_frame
self.train_len = len(data_frame)
self.batch_size = batch_size
self.img_shape = img_shape
self.num_classes = num_classes
print(f"Found {self.data_frame.shape[0]} images belonging to {self.num_classes} classes")
def __len__(self):
''' return total number of batches '''
self.data_frame = shuffle(self.data_frame)
return math.ceil(self.train_len/self.batch_size)
def on_epoch_end(self):
''' shuffle data after every epoch '''
# fix on epoch end it's not working, adding shuffle in len for alternative
pass
def __data_augmentation(self, img):
''' function for apply some data augmentation '''
img = tf.keras.preprocessing.image.random_shift(img, 0.2, 0.3)
img = tf.image.random_flip_left_right(img)
img = tf.image.random_flip_up_down(img)
return img
def __get_image(self, file_id):
""" open image with file_id path and apply data augmentation """
img = np.asarray(Image.open(file_id))
img = np.resize(img, self.img_shape)
img = self.__data_augmentation(img)
img = preprocess_input(img)
return img
def __get_label(self, label_id):
""" uncomment the below line to convert label into categorical format """
#label_id = tf.keras.utils.to_categorical(label_id, num_classes)
return label_id
def __getitem__(self, idx):
batch_x = self.data_frame["filenames"][idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.data_frame["labels"][idx * self.batch_size:(idx + 1) * self.batch_size]
# read your data here using the batch lists, batch_x and batch_y
x = [self.__get_image(file_id) for file_id in batch_x]
y = [self.__get_label(label_id) for label_id in batch_y]
return tf.convert_to_tensor(x), tf.convert_to_tensor(y)