用于 TPL 数据流的 BroadcastCopyBlock 保证交付
BroadcastCopyBlock for TPL Dataflow with guaranteed delivery
我很高兴就 TPL 数据流中 BroadcastCopyBlock
的以下实现提供一些意见,它将收到的消息复制给所有注册到 BroadcastCopyBlock
的消费者,并保证交付给所有消费者消费者,在收到消息时链接到块。 (与不保证消息传递的 BroadcastBlock
不同,如果下一个消息进来,则在前一个消息已传递给所有消费者之前)。
我主要关心的是消息的保留和释放保留。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何标记。我在考虑一些人工消息接收器(没有任何操作的 ActionBlock
),或者我可以将消息标记为已丢弃吗?
我们也欢迎您提供有关实施的进一步意见。
这可能几乎是以下问题的重复,但我更愿意使用我自己的 class,而不是创建块的方法。或者这会被认为是糟糕的风格吗?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
TL/DR
您的实现使用 Post
method inside the ActionBlock
, which still will lose the data if target rejects the message, switch to the SendAsync
one, and, probably, you don't need to implenment all these methods, you need only ITargetBlock<in TInput>
接口实现。
在回到您的主要问题之前,我想澄清一些事情。我认为您对 TPL Dataflow
library, and I want explain them a bit here. The behavior you're saying The first consumer, which receives the message, deletes it from the queue
is not about the BroadcastBlock
, it is about the multiple consumers linked for an ISourceBlock
, like BufferBlock
:
中的某些选项感到困惑
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
BroadcastBlock
所做的正是您所说的,请考虑以下代码:
private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是,这只能在传入数据的速度低于处理数据的速度时才能完成,因为在其他情况下,正如您在问题中所述,由于缓冲区增长,您的内存将很快耗尽。让我们看看如果我们使用 ExecutionDataflowBlockOptions
来限制慢块的传入数据缓冲区会发生什么:
private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
如您所见,我们的慢块丢失了最后一条消息,这不是我们要找的。这是因为 BroadcastBlock
默认使用 Post
方法传递消息。根据官方介绍文件:
- Post
- An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
- SendAsync
- An extension method that asynchronously sends to target blocks while supporting buffering. A
Post
operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline. SendAsync
enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.
所以,这种方法可以帮助我们完成任务,让我们为我们的真实处理器引入一些包装器 ActionBlock
, which do exactly what we want - SendAsync
数据:
private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是 这种等待永远不会结束 - 我们的基本包装器不会传播链接块的完成,并且 ActionBlock
不能链接到任何东西。我们可以尝试等待包装器完成:
private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}
输出将是
FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
这绝对不是我们想要的 - ActionBlock
完成了所有工作,不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在 Sleep
方法结束之前就退出了该方法!所以你肯定需要你自己的实现。
现在,最后,关于您的代码的一些想法:
- 您不需要实现如此大量的方法 - 您的包装器将用作
ITargetBlock<in TInput>
,因此只需实现该接口。
- 您的实现在
ActionBlock
中使用了 Post
方法,如我们所见,如果出现某些问题,可能会导致数据丢失消费者方面。请考虑使用 SendAsync
方法。
- 在之前的更改之后,您应该衡量数据流的性能 - 如果您有很多异步等待数据传送,您可能会看到性能 and/or 内存问题。这应该通过 linked documentation.
中讨论的一些高级设置来解决
- 您对
Completion
task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be even NullTarget
块的实现,它只是同步地丢弃传入的消息),并等待它完成。
我只想添加到 ,在 BoundedWrapperInfiniteCase
中,您可以手动传播完成。在调用 broadcast.SendAsync()
之前添加以下行,然后等待两个操作完成以使操作包装器完成内部操作:
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
例如
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
for (var i = 0; i < 3; ++i)
broadcast.SendAsync(i);
broadcast.Complete();
// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);
输出将与 VMAtm 的回答相同,但所有任务都将正确完成。
我很高兴就 TPL 数据流中 BroadcastCopyBlock
的以下实现提供一些意见,它将收到的消息复制给所有注册到 BroadcastCopyBlock
的消费者,并保证交付给所有消费者消费者,在收到消息时链接到块。 (与不保证消息传递的 BroadcastBlock
不同,如果下一个消息进来,则在前一个消息已传递给所有消费者之前)。
我主要关心的是消息的保留和释放保留。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何标记。我在考虑一些人工消息接收器(没有任何操作的 ActionBlock
),或者我可以将消息标记为已丢弃吗?
我们也欢迎您提供有关实施的进一步意见。
这可能几乎是以下问题的重复,但我更愿意使用我自己的 class,而不是创建块的方法。或者这会被认为是糟糕的风格吗?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
TL/DR
您的实现使用 Post
method inside the ActionBlock
, which still will lose the data if target rejects the message, switch to the SendAsync
one, and, probably, you don't need to implenment all these methods, you need only ITargetBlock<in TInput>
接口实现。
在回到您的主要问题之前,我想澄清一些事情。我认为您对 TPL Dataflow
library, and I want explain them a bit here. The behavior you're saying The first consumer, which receives the message, deletes it from the queue
is not about the BroadcastBlock
, it is about the multiple consumers linked for an ISourceBlock
, like BufferBlock
:
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
BroadcastBlock
所做的正是您所说的,请考虑以下代码:
private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是,这只能在传入数据的速度低于处理数据的速度时才能完成,因为在其他情况下,正如您在问题中所述,由于缓冲区增长,您的内存将很快耗尽。让我们看看如果我们使用 ExecutionDataflowBlockOptions
来限制慢块的传入数据缓冲区会发生什么:
private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
如您所见,我们的慢块丢失了最后一条消息,这不是我们要找的。这是因为 BroadcastBlock
默认使用 Post
方法传递消息。根据官方介绍文件:
- Post
- An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
- SendAsync
- An extension method that asynchronously sends to target blocks while supporting buffering. A
Post
operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline.SendAsync
enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.
所以,这种方法可以帮助我们完成任务,让我们为我们的真实处理器引入一些包装器 ActionBlock
, which do exactly what we want - SendAsync
数据:
private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是 这种等待永远不会结束 - 我们的基本包装器不会传播链接块的完成,并且 ActionBlock
不能链接到任何东西。我们可以尝试等待包装器完成:
private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}
输出将是
FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
这绝对不是我们想要的 - ActionBlock
完成了所有工作,不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在 Sleep
方法结束之前就退出了该方法!所以你肯定需要你自己的实现。
现在,最后,关于您的代码的一些想法:
- 您不需要实现如此大量的方法 - 您的包装器将用作
ITargetBlock<in TInput>
,因此只需实现该接口。 - 您的实现在
ActionBlock
中使用了Post
方法,如我们所见,如果出现某些问题,可能会导致数据丢失消费者方面。请考虑使用SendAsync
方法。 - 在之前的更改之后,您应该衡量数据流的性能 - 如果您有很多异步等待数据传送,您可能会看到性能 and/or 内存问题。这应该通过 linked documentation. 中讨论的一些高级设置来解决
- 您对
Completion
task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be evenNullTarget
块的实现,它只是同步地丢弃传入的消息),并等待它完成。
我只想添加到 BoundedWrapperInfiniteCase
中,您可以手动传播完成。在调用 broadcast.SendAsync()
之前添加以下行,然后等待两个操作完成以使操作包装器完成内部操作:
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
例如
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
for (var i = 0; i < 3; ++i)
broadcast.SendAsync(i);
broadcast.Complete();
// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);
输出将与 VMAtm 的回答相同,但所有任务都将正确完成。