项目留在队列中的替代方案(线程消费者-生产者)
Alternative to items being left in queue (threaded consumer-producer)
作为使用 this code 的库的一部分,有一个 SimpleQueue
class 将生产者与消费者分离:
private class SimpleQueue
{
private readonly Func<ResolvedEvent, CancellationToken, Task> _onResolvedEvent;
private readonly CancellationToken _token;
private readonly ConcurrentQueue<ResolvedEvent> _events;
private readonly InterlockedBoolean _isPushing;
private static readonly ILog s_logger;
static SimpleQueue()
{
s_logger = LogProvider.For<SimpleQueue>();
}
public SimpleQueue(Func<ResolvedEvent, CancellationToken, Task> onResolvedEvent, CancellationToken token)
{
_onResolvedEvent = onResolvedEvent;
_token = token;
_events = new ConcurrentQueue<ResolvedEvent>();
_isPushing = new InterlockedBoolean();
}
public void Enqueue(ResolvedEvent resolvedEvent)
{
_events.Enqueue(resolvedEvent);
Push();
}
private void Push()
{
if(_isPushing.CompareExchange(true, false))
{
return;
}
Task.Run(async () =>
{
ResolvedEvent resolvedEvent;
while (!_token.IsCancellationRequested && _events.TryDequeue(out resolvedEvent))
{
try
{
await _onResolvedEvent(resolvedEvent, _token);
}
catch(Exception ex)
{
s_logger.ErrorException(ex.Message, ex);
}
}
_isPushing.Set(false);
}, _token);
}
}
我想我可以在这里看到一个问题,如果:
- 在任务线程中,调用
events.TryDequeue(out resolvedEvent))
returnsfalse
- 然后上下文切换到另一个线程
- 在另一个线程事件排队时,
Push()
被调用,但 returns 立即被调用,因为 _isPushing
是 true
- 上下文切换回任务线程,
_isPushing
设置为false
任务退出
在这种情况下,队列中会有 on 事件直到下一次入队并在 Push()
中循环以出队。如果是这样,我不认为我喜欢这个。
所以我改写为使用 TPL BlockingQueue:
public class SimpleQueue<T>
{
readonly BufferBlock<T> _queue = new BufferBlock<T>();
public SimpleQueue(Func<T, CancellationToken, Task> onItemQueued, CancellationToken token)
{
Task.Run(async () =>
{
while (true)
{
try
{
var item = await _queue.ReceiveAsync(token);
if (token.IsCancellationRequested)
return;
await onItemQueued(item, token);
}
catch (Exception ex)
{
// log
}
}
}, token);
}
public void Enqueue(T item)
{
_queue.Post(item);
}
}
class Program
{
private readonly static SimpleQueue<string> Queue;
private readonly static CancellationToken CancellationToken = new CancellationToken();
static async Task OnEvent(string item, CancellationToken cancellationToken)
{
await Task.Run(() =>
{
Console.WriteLine("Rx from remote {0}", item);
}, cancellationToken);
}
static Program()
{
Queue = new SimpleQueue<string>(OnEvent, CancellationToken);
}
static void Main(string[] args)
{
// wire up code to call ExternalReceive from 3rd party lib
DevLinkImports.DLRegisterType2CallDeltas(0,CallEvent);
Console.ReadLine();
}
// this is called by 3rd party dll on demand
static void CallEvent(uint pbxh, string info)
{
// we must dispatch and return within 50ms or 3rd party lib will go ape
Queue.Enqueue(info);
}
问题:
出于学习目的,我是否正确地看到了原始 SimpleQueue 的问题,并且可以根据时间保留项目?
如果没有 "premature optimization",我觉得只有问一下,每次调用 static async Task OnEvent(string item, CancellationToken cancellationToken)
时启动一个新线程的开销是多少?
通过重写,我不会在休眠时让线程保持打开状态,但实际上使用这个异步调用有什么好处,或者只是启动一个线程并使用 BlockingCollection
并阻止出队?我不想为了牺牲启动新线程所需的时间而节省一个线程。
- For learning purposes am I correct in seeing the issue with the original SimpleQueue and items could be left depending on timing?
无法确定,因为此处未提供 InterlockedBoolean
的实现。您的担忧似乎有道理,但我想在尝试做出明确声明之前先查看实际代码。
- Without "premature optimization" i feel it only sensible to ask, what is the overhead of spinning up a new thread for each call to static async Task OnEvent(string item, CancellationToken cancellationToken)?
创建 new 线程的开销很大。但是您的 OnEvent()
方法可能会或可能不会真正做到这一点。您正在创建一个新任务,然后调度程序将决定如何处理它。如果线程池包含用于执行它的可用线程and/or,调度程序决定它可以等待现有但繁忙的线程可用,然后不会创建新线程。
- With the rewrite I am not holding threads open when sleeping, but in reality is there any benefit of using this async call or just instead spin up a single thread and use a BlockingCollection and blocking on dequeue? I don't want to save one thread for sacrificing time taken to spin up new threads.
在您的程序中添加一个单线程来为队列服务并不是那么糟糕。您只需创建一次,因此开销无关紧要。它确实为堆栈占用了 1 兆字节(默认情况下),但这通常也不会成为问题。
另一方面,由于使用了线程池,同样地调用 Task.Run()
也不太可能导致显着的开销。
所以对我来说,这归结为美观和可维护性。
我要指出,使用 BlockingCollection<T>
与 BufferBlock<T>
的问题与您实施 OnEvent()
的问题有些不同。前者关注底层队列的实现,而后者关注事件实际出队时发生的情况。即使您使用 BlockingCollection<T>
,如果您不更改 OnEvent()
,您仍然会为每个事件启动一个新任务。相反,没有理由不能使 OnEvent()
运行 事件同步处理,即使使用 BufferBlock<T>
.
队列代码显然期望事件被异步处理,但事件不一定是。这取决于队列的客户端。
作为使用 this code 的库的一部分,有一个 SimpleQueue
class 将生产者与消费者分离:
private class SimpleQueue
{
private readonly Func<ResolvedEvent, CancellationToken, Task> _onResolvedEvent;
private readonly CancellationToken _token;
private readonly ConcurrentQueue<ResolvedEvent> _events;
private readonly InterlockedBoolean _isPushing;
private static readonly ILog s_logger;
static SimpleQueue()
{
s_logger = LogProvider.For<SimpleQueue>();
}
public SimpleQueue(Func<ResolvedEvent, CancellationToken, Task> onResolvedEvent, CancellationToken token)
{
_onResolvedEvent = onResolvedEvent;
_token = token;
_events = new ConcurrentQueue<ResolvedEvent>();
_isPushing = new InterlockedBoolean();
}
public void Enqueue(ResolvedEvent resolvedEvent)
{
_events.Enqueue(resolvedEvent);
Push();
}
private void Push()
{
if(_isPushing.CompareExchange(true, false))
{
return;
}
Task.Run(async () =>
{
ResolvedEvent resolvedEvent;
while (!_token.IsCancellationRequested && _events.TryDequeue(out resolvedEvent))
{
try
{
await _onResolvedEvent(resolvedEvent, _token);
}
catch(Exception ex)
{
s_logger.ErrorException(ex.Message, ex);
}
}
_isPushing.Set(false);
}, _token);
}
}
我想我可以在这里看到一个问题,如果:
- 在任务线程中,调用
events.TryDequeue(out resolvedEvent))
returnsfalse
- 然后上下文切换到另一个线程
- 在另一个线程事件排队时,
Push()
被调用,但 returns 立即被调用,因为_isPushing
是true
- 上下文切换回任务线程,
_isPushing
设置为false
任务退出
在这种情况下,队列中会有 on 事件直到下一次入队并在 Push()
中循环以出队。如果是这样,我不认为我喜欢这个。
所以我改写为使用 TPL BlockingQueue:
public class SimpleQueue<T>
{
readonly BufferBlock<T> _queue = new BufferBlock<T>();
public SimpleQueue(Func<T, CancellationToken, Task> onItemQueued, CancellationToken token)
{
Task.Run(async () =>
{
while (true)
{
try
{
var item = await _queue.ReceiveAsync(token);
if (token.IsCancellationRequested)
return;
await onItemQueued(item, token);
}
catch (Exception ex)
{
// log
}
}
}, token);
}
public void Enqueue(T item)
{
_queue.Post(item);
}
}
class Program
{
private readonly static SimpleQueue<string> Queue;
private readonly static CancellationToken CancellationToken = new CancellationToken();
static async Task OnEvent(string item, CancellationToken cancellationToken)
{
await Task.Run(() =>
{
Console.WriteLine("Rx from remote {0}", item);
}, cancellationToken);
}
static Program()
{
Queue = new SimpleQueue<string>(OnEvent, CancellationToken);
}
static void Main(string[] args)
{
// wire up code to call ExternalReceive from 3rd party lib
DevLinkImports.DLRegisterType2CallDeltas(0,CallEvent);
Console.ReadLine();
}
// this is called by 3rd party dll on demand
static void CallEvent(uint pbxh, string info)
{
// we must dispatch and return within 50ms or 3rd party lib will go ape
Queue.Enqueue(info);
}
问题:
出于学习目的,我是否正确地看到了原始 SimpleQueue 的问题,并且可以根据时间保留项目?
如果没有 "premature optimization",我觉得只有问一下,每次调用
static async Task OnEvent(string item, CancellationToken cancellationToken)
时启动一个新线程的开销是多少?通过重写,我不会在休眠时让线程保持打开状态,但实际上使用这个异步调用有什么好处,或者只是启动一个线程并使用
BlockingCollection
并阻止出队?我不想为了牺牲启动新线程所需的时间而节省一个线程。
- For learning purposes am I correct in seeing the issue with the original SimpleQueue and items could be left depending on timing?
无法确定,因为此处未提供 InterlockedBoolean
的实现。您的担忧似乎有道理,但我想在尝试做出明确声明之前先查看实际代码。
- Without "premature optimization" i feel it only sensible to ask, what is the overhead of spinning up a new thread for each call to static async Task OnEvent(string item, CancellationToken cancellationToken)?
创建 new 线程的开销很大。但是您的 OnEvent()
方法可能会或可能不会真正做到这一点。您正在创建一个新任务,然后调度程序将决定如何处理它。如果线程池包含用于执行它的可用线程and/or,调度程序决定它可以等待现有但繁忙的线程可用,然后不会创建新线程。
- With the rewrite I am not holding threads open when sleeping, but in reality is there any benefit of using this async call or just instead spin up a single thread and use a BlockingCollection and blocking on dequeue? I don't want to save one thread for sacrificing time taken to spin up new threads.
在您的程序中添加一个单线程来为队列服务并不是那么糟糕。您只需创建一次,因此开销无关紧要。它确实为堆栈占用了 1 兆字节(默认情况下),但这通常也不会成为问题。
另一方面,由于使用了线程池,同样地调用 Task.Run()
也不太可能导致显着的开销。
所以对我来说,这归结为美观和可维护性。
我要指出,使用 BlockingCollection<T>
与 BufferBlock<T>
的问题与您实施 OnEvent()
的问题有些不同。前者关注底层队列的实现,而后者关注事件实际出队时发生的情况。即使您使用 BlockingCollection<T>
,如果您不更改 OnEvent()
,您仍然会为每个事件启动一个新任务。相反,没有理由不能使 OnEvent()
运行 事件同步处理,即使使用 BufferBlock<T>
.
队列代码显然期望事件被异步处理,但事件不一定是。这取决于队列的客户端。