有没有一种惯用的方法来路由在 TPL 数据流图中的 TransformBlock 中失败的元素?

is there an idiomatic way to route elements which fail in a TransformBlock within a TPL dataflow graph?

我正在使用 TPL 数据流创建输入元素的 bufferBlock,这些元素由输出到输出 bufferBlock 的 TransformBlock 处理

inputQueue = new BufferBlock<InputPacket>;
processQueue = new TransformBlock <InputPacket, OutputPacket>;
outputQueue = new BufferBlock<OutputPacket>;

inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true });
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true });

是否有惯用的方法来路由失败的元素?

InputPacket元素处理完成时,关联的动作 processQueue 将 return 一个 OutputPacket 路由到 outputQueue

如果与 processQueue 关联的操作调用不可靠的网络服务, 然后处理一些 InputPacket 元素会超时, 我想重试这些元素 x 次。但是我不想立即尝试它们,我想将它们放回输入队列中。

我希望能够路由 InputPacket 元素 超时回到 inputQueue 直到他们失败 x 次然后到 failureQueue :

BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>;

使用 LinkTo 谓词因涉及两种不同类型而变得复杂:

InputPacket OutputPacket

我看起来可以通过更改来处理这个问题:

processQueue = new TransformBlock <InputPacket, ParentPacketType>;

然后根据数据包的类型编写谓词。

通过将输出作为 InputPacket

的成员存储在 inputElement 中

但这两种方法似乎都很好

首先,我认为您的术语令人困惑,您应该坚持使用 TPL Dataflow 使用的术语。数据流图不是由队列组成的,它是由组成的。这些块处理 数据元素 而不是块。

现在,当 TPL Dataflow 没有为您提供满足您要求的功能块时,一种解决方案是您自己从提供的功能块中构建该功能块。一个简单的版本可能如下所示:

public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform, int retryCount,
    ITargetBlock<(TInput, Exception)> failureBlock)
{
    var failedInputs = new Dictionary<TInput, int>();

    TransformManyBlock<TInput, TOutput> resultBlock = null;

    resultBlock = new TransformManyBlock<TInput, TOutput>(
        async input =>
        {
            try
            {
                return new[] { transform(input) };
            }
            catch (Exception exception)
            {
                failedInputs.TryGetValue(input, out int count);

                if (count < retryCount)
                {
                    failedInputs[input] = count + 1;
                    // ignoring the returned Task, to avoid deadlock
                    _ = resultBlock.SendAsync(input);
                }
                else
                {
                    failedInputs.Remove(input);
                    await failureBlock.SendAsync((input, exception));
                }

                return Array.Empty<TOutput>();
            }
        });

    return resultBlock;
}

我做出的假设:

  • 您可以使用 C# 7.0。如果没有,我用的功能很容易替换。
  • 可以忽略除最后一个异常之外的所有异常。否则,Dictionary 将不得不存储所有先前的异常,然后将它们发送到 failureBlock.
  • 可以将失败的数据元素发送回同一块。否则,该方法将不得不再接受一个参数并将其用于该目的。
  • 该块不需要支持并行性。如果是这样,您必须使代码线程安全(您可能首先使用 ConcurrentDictionary 而不是 Dictionary)。
  • 输入数据元素可以存储在字典中(阅读:它们的 GetHashCode 行为正确)并且不会有重复输入。否则,您将不得不设计一些其他机制来计算重试次数。