Google Colaboratory 上的 Tensorflow 2.x 自定义损失函数

Tensorflow 2.x custom loss function on Google Colaboratory

问题

我正在尝试为我的 Tensorflow 2 模型编写自定义损失函数。我编写了以下函数来计算我手动传入输入和输出张量时所寻求的损失。

def on_off_balance_loss(y_true: EagerTensor, y_pred: EagerTensor) -> float:
    y_true_array: ndarray = np.asarray(y_true).flatten()
    y_predict_array: ndarray = np.asarray(y_pred).flatten()

    on_delta: float = 0.999

    on_loss: float = 0
    off_loss: float = 0
    on_count: int = 0
    off_count: int = 0

    for i in range(len(y_true_array)):
        loss: float = cell_loss(y_true_array[i], y_predict_array[i])
        if y_true_array[i] > on_delta:
            on_count += 1
            on_loss = on_loss * ((on_count - 1) / on_count) + (loss / on_count)
        else:
            off_count += 1
            off_loss = off_loss * ((off_count - 1) / off_count) + (loss / off_count)

    on_factor: int = 4
    return (on_factor * on_loss + off_loss) / (on_factor + 1)

对于上下文,y_true 由 1 和 0 作为浮点数的二维矩阵组成,其中 0 更为常见。因此,即使 1 所在的位置是更重要的指标,我的模型通过仅让大部分 0 正确而获得了良好的损失值。此自定义损失更按比例强调 1 的位置。

我将 model.compile(loss="binary_crossentropy") 更改为 model.compile(loss=on_off_balance_loss) 以尝试使用新的损失函数。这似乎不起作用,因为损失函数应该包含 an entire batch of data。所以,我用 model.compile(loss=on_off_balance_batch_loss):

尝试了这样的事情
def on_off_balance_batch_loss(y_true, y_pred) -> float:
    y_trues: list = tf.unstack(y_true)
    y_preds: list = tf.unstack(y_pred)
    loss: float = 0

    for i in range(0, len(y_trues)):
        loss = loss * (i / (i + 1)) + (on_off_balance_loss(y_trues[i], y_preds[i]) / (i + 1))

    return loss

这行不通。 y_true的形状是(None, None, None)y_pred的形状是(None, X, Y),其中XY是二维数组的维度1 和 0。

我在 Google Colaboratory 工作。但是,在本地,np.asarray() 似乎以在 Colaboratory 上引发错误的方式工作。所以,我不太确定错误是出在我的损失函数上还是出在 Colaboratory 中的某些设置上。我已确保我在本地和 Colaboratory 上都使用 Tensorflow 2.3.0。

编辑:

我尝试将 run_eagerly=True 添加到 model.compile() 并在 on_off_balance_loss() 中使用 .numpy() 而不是 np.asarray()。这将 on_off_balance_batch_loss 中的输入类型从 Tensor 更改为 EagerTensor。这会导致错误 ValueError: No gradients provided for any variable: ['lstm_3/lstm_cell_3/kernel:0', 'lstm_3/lstm_cell_3/recurrent_kernel:0', 'lstm_3/lstm_cell_3/bias:0', 'dense_2/kernel:0', 'dense_2/bias:0', 'lstm_4/lstm_cell_4/kernel:0', 'lstm_4/lstm_cell_4/recurrent_kernel:0', 'lstm_4/lstm_cell_4/bias:0', 'dense_3/kernel:0', 'dense_3/bias:0', 'lstm_5/lstm_cell_5/kernel:0', 'lstm_5/lstm_cell_5/recurrent_kernel:0', 'lstm_5/lstm_cell_5/bias:0'].。如果我使用

也会出现同样的错误
def on_off_balance_batch_loss(y_true: EagerTensor, y_pred: EagerTensor) -> float:
    y_trues = tf.TensorArray(tf.float32, 1, dynamic_size=True, infer_shape=False).unstack(y_true)
    y_preds = tf.TensorArray(tf.float32, 1, dynamic_size=True, infer_shape=False).unstack(y_pred)

    loss: float = 0.0
    i: int = 0

    for tensor in range(y_trues.size()):
        elem_loss: float = on_off_balance_loss(y_trues.read(i), y_preds.read(i))
        loss = loss * (i / (i + 1)) + (elem_loss / (i + 1))
        i += 1

    return loss

并省略 run_eagerly=True。甚至在出现错误之前,整个程序似乎比我使用默认损失函数时 运行 慢。

好吧,事实证明解决方案比我在上面尝试的要简单得多。下面是实现这种功能的风格。我将它与我的原始函数 on_off_balance_loss 的输出进行了比较,结果匹配。

def on_off_equal_loss(y_true: Tensor, y_pred: Tensor) -> Tensor:
    on_delta: float = 0.99

    on_mask: Tensor = tf.greater_equal(y_true, on_delta)
    off_mask: Tensor = tf.less(y_true, on_delta)

    on_loss: Tensor = tf.divide(tf.reduce_sum(tf.abs(tf.subtract(
        y_true[on_mask], y_pred[on_mask]
    ))), tf.cast(tf.math.count_nonzero(on_mask), tf.float32))

    off_loss: Tensor = tf.divide(tf.reduce_sum(tf.abs(tf.subtract(
        y_true[off_mask], y_pred[off_mask]
    ))), tf.cast(tf.math.count_nonzero(off_mask), tf.float32))

    on_factor: float = 4.0
    return tf.divide(tf.add(tf.multiply(on_factor, on_loss), off_loss), on_factor + 1.0)