有没有一种惯用的方法来路由在 TPL 数据流图中的 TransformBlock 中失败的元素?
is there an idiomatic way to route elements which fail in a TransformBlock within a TPL dataflow graph?
我正在使用 TPL 数据流创建输入元素的 bufferBlock,这些元素由输出到输出 bufferBlock 的 TransformBlock 处理
inputQueue = new BufferBlock<InputPacket>;
processQueue = new TransformBlock <InputPacket, OutputPacket>;
outputQueue = new BufferBlock<OutputPacket>;
inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true });
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true });
是否有惯用的方法来路由失败的元素?
当InputPacket
元素处理完成时,关联的动作
processQueue
将 return 一个 OutputPacket
路由到 outputQueue
如果与 processQueue
关联的操作调用不可靠的网络服务,
然后处理一些 InputPacket
元素会超时,
我想重试这些元素 x 次。但是我不想立即尝试它们,我想将它们放回输入队列中。
我希望能够路由 InputPacket
元素
超时回到 inputQueue
直到他们失败 x
次然后到 failureQueue
:
BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>;
使用 LinkTo
谓词因涉及两种不同类型而变得复杂:
InputPacket OutputPacket
我看起来可以通过更改来处理这个问题:
processQueue = new TransformBlock <InputPacket, ParentPacketType>;
然后根据数据包的类型编写谓词。
或
通过将输出作为 InputPacket
、
的成员存储在 inputElement 中
但这两种方法似乎都很好。
首先,我认为您的术语令人困惑,您应该坚持使用 TPL Dataflow 使用的术语。数据流图不是由队列组成的,它是由块组成的。这些块处理 数据元素 而不是块。
现在,当 TPL Dataflow 没有为您提供满足您要求的功能块时,一种解决方案是您自己从提供的功能块中构建该功能块。一个简单的版本可能如下所示:
public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform, int retryCount,
ITargetBlock<(TInput, Exception)> failureBlock)
{
var failedInputs = new Dictionary<TInput, int>();
TransformManyBlock<TInput, TOutput> resultBlock = null;
resultBlock = new TransformManyBlock<TInput, TOutput>(
async input =>
{
try
{
return new[] { transform(input) };
}
catch (Exception exception)
{
failedInputs.TryGetValue(input, out int count);
if (count < retryCount)
{
failedInputs[input] = count + 1;
// ignoring the returned Task, to avoid deadlock
_ = resultBlock.SendAsync(input);
}
else
{
failedInputs.Remove(input);
await failureBlock.SendAsync((input, exception));
}
return Array.Empty<TOutput>();
}
});
return resultBlock;
}
我做出的假设:
- 您可以使用 C# 7.0。如果没有,我用的功能很容易替换。
- 可以忽略除最后一个异常之外的所有异常。否则,
Dictionary
将不得不存储所有先前的异常,然后将它们发送到 failureBlock
.
- 可以将失败的数据元素发送回同一块。否则,该方法将不得不再接受一个参数并将其用于该目的。
- 该块不需要支持并行性。如果是这样,您必须使代码线程安全(您可能首先使用
ConcurrentDictionary
而不是 Dictionary
)。
- 输入数据元素可以存储在字典中(阅读:它们的
GetHashCode
行为正确)并且不会有重复输入。否则,您将不得不设计一些其他机制来计算重试次数。
我正在使用 TPL 数据流创建输入元素的 bufferBlock,这些元素由输出到输出 bufferBlock 的 TransformBlock 处理
inputQueue = new BufferBlock<InputPacket>;
processQueue = new TransformBlock <InputPacket, OutputPacket>;
outputQueue = new BufferBlock<OutputPacket>;
inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true });
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true });
是否有惯用的方法来路由失败的元素?
当InputPacket
元素处理完成时,关联的动作
processQueue
将 return 一个 OutputPacket
路由到 outputQueue
如果与 processQueue
关联的操作调用不可靠的网络服务,
然后处理一些 InputPacket
元素会超时,
我想重试这些元素 x 次。但是我不想立即尝试它们,我想将它们放回输入队列中。
我希望能够路由 InputPacket
元素
超时回到 inputQueue
直到他们失败 x
次然后到 failureQueue
:
BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>;
使用 LinkTo
谓词因涉及两种不同类型而变得复杂:
InputPacket OutputPacket
我看起来可以通过更改来处理这个问题:
processQueue = new TransformBlock <InputPacket, ParentPacketType>;
然后根据数据包的类型编写谓词。
或
通过将输出作为 InputPacket
、
但这两种方法似乎都很好。
首先,我认为您的术语令人困惑,您应该坚持使用 TPL Dataflow 使用的术语。数据流图不是由队列组成的,它是由块组成的。这些块处理 数据元素 而不是块。
现在,当 TPL Dataflow 没有为您提供满足您要求的功能块时,一种解决方案是您自己从提供的功能块中构建该功能块。一个简单的版本可能如下所示:
public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform, int retryCount,
ITargetBlock<(TInput, Exception)> failureBlock)
{
var failedInputs = new Dictionary<TInput, int>();
TransformManyBlock<TInput, TOutput> resultBlock = null;
resultBlock = new TransformManyBlock<TInput, TOutput>(
async input =>
{
try
{
return new[] { transform(input) };
}
catch (Exception exception)
{
failedInputs.TryGetValue(input, out int count);
if (count < retryCount)
{
failedInputs[input] = count + 1;
// ignoring the returned Task, to avoid deadlock
_ = resultBlock.SendAsync(input);
}
else
{
failedInputs.Remove(input);
await failureBlock.SendAsync((input, exception));
}
return Array.Empty<TOutput>();
}
});
return resultBlock;
}
我做出的假设:
- 您可以使用 C# 7.0。如果没有,我用的功能很容易替换。
- 可以忽略除最后一个异常之外的所有异常。否则,
Dictionary
将不得不存储所有先前的异常,然后将它们发送到failureBlock
. - 可以将失败的数据元素发送回同一块。否则,该方法将不得不再接受一个参数并将其用于该目的。
- 该块不需要支持并行性。如果是这样,您必须使代码线程安全(您可能首先使用
ConcurrentDictionary
而不是Dictionary
)。 - 输入数据元素可以存储在字典中(阅读:它们的
GetHashCode
行为正确)并且不会有重复输入。否则,您将不得不设计一些其他机制来计算重试次数。