项目留在队列中的替代方案(线程消费者-生产者)

Alternative to items being left in queue (threaded consumer-producer)

作为使用 this code 的库的一部分,有一个 SimpleQueue class 将生产者与消费者分离:

private class SimpleQueue
{
    private readonly Func<ResolvedEvent, CancellationToken, Task> _onResolvedEvent;
    private readonly CancellationToken _token;
    private readonly ConcurrentQueue<ResolvedEvent> _events;
    private readonly InterlockedBoolean _isPushing;
    private static readonly ILog s_logger;

    static SimpleQueue()
    {
        s_logger = LogProvider.For<SimpleQueue>();
    }

    public SimpleQueue(Func<ResolvedEvent, CancellationToken, Task> onResolvedEvent, CancellationToken token)
    {
        _onResolvedEvent = onResolvedEvent;
        _token = token;
        _events = new ConcurrentQueue<ResolvedEvent>();
        _isPushing = new InterlockedBoolean();
    }

    public void Enqueue(ResolvedEvent resolvedEvent)
    {
        _events.Enqueue(resolvedEvent);
        Push();
    }

    private void Push()
    {
        if(_isPushing.CompareExchange(true, false))
        {
            return;
        }
        Task.Run(async () =>
        {
            ResolvedEvent resolvedEvent;
            while (!_token.IsCancellationRequested && _events.TryDequeue(out resolvedEvent))
            {
                try
                {
                    await _onResolvedEvent(resolvedEvent, _token);
                }
                catch(Exception ex)
                {
                    s_logger.ErrorException(ex.Message, ex);
                }
            }
            _isPushing.Set(false);
        }, _token);
    }
}

我想我可以在这里看到一个问题,如果:

  1. 在任务线程中,调用events.TryDequeue(out resolvedEvent))returnsfalse
  2. 然后上下文切换到另一个线程
  3. 在另一个线程事件排队时,Push() 被调用,但 returns 立即被调用,因为 _isPushingtrue
  4. 上下文切换回任务线程,_isPushing设置为false任务退出

在这种情况下,队列中会有 on 事件直到下一次入队并在 Push() 中循环以出队。如果是这样,我不认为我喜欢这个。

所以我改写为使用 TPL BlockingQueue:

public class SimpleQueue<T>
{
    readonly BufferBlock<T> _queue = new BufferBlock<T>();

    public SimpleQueue(Func<T, CancellationToken, Task> onItemQueued, CancellationToken token)
    {
        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    var item = await _queue.ReceiveAsync(token);
                    if (token.IsCancellationRequested)
                        return;
                    await onItemQueued(item, token);
                }
                catch (Exception ex)
                {
                    // log
                }
            }
        }, token);
    }

    public void Enqueue(T item)
    {
        _queue.Post(item);
    }
}

class Program
{
    private readonly static SimpleQueue<string> Queue;
    private readonly static CancellationToken CancellationToken = new CancellationToken();

    static async Task OnEvent(string item, CancellationToken cancellationToken)
    {
        await Task.Run(() =>
        {
            Console.WriteLine("Rx from remote {0}", item);

        }, cancellationToken);
    }

    static Program()
    {
        Queue = new SimpleQueue<string>(OnEvent, CancellationToken);
    }

    static void Main(string[] args)
    {
        // wire up code to call ExternalReceive from 3rd party lib
        DevLinkImports.DLRegisterType2CallDeltas(0,CallEvent);

        Console.ReadLine();
    }

    // this is called by 3rd party dll on demand
    static void CallEvent(uint pbxh, string info)
    {
        // we must dispatch and return within 50ms or 3rd party lib will go ape
        Queue.Enqueue(info);  
    }

问题:

  1. 出于学习目的,我是否正确地看到了原始 SimpleQueue 的问题,并且可以根据时间保留项目?

  2. 如果没有 "premature optimization",我觉得只有问一下,每次调用 static async Task OnEvent(string item, CancellationToken cancellationToken) 时启动一个新线程的开销是多少?

  3. 通过重写,我不会在休眠时让线程保持打开状态,但实际上使用这个异步调用有什么好处,或者只是启动一个线程并使用 BlockingCollection并阻止出队?我不想为了牺牲启动新线程所需的时间而节省一个线程。

  1. For learning purposes am I correct in seeing the issue with the original SimpleQueue and items could be left depending on timing?

无法确定,因为此处未提供 InterlockedBoolean 的实现。您的担忧似乎有道理,但我想在尝试做出明确声明之前先查看实际代码。

  1. Without "premature optimization" i feel it only sensible to ask, what is the overhead of spinning up a new thread for each call to static async Task OnEvent(string item, CancellationToken cancellationToken)?

创建 new 线程的开销很大。但是您的 OnEvent() 方法可能会或可能不会真正做到这一点。您正在创建一个新任务,然后调度程序将决定如何处理它。如果线程池包含用于执行它的可用线程and/or,调度程序决定它可以等待现有但繁忙的线程可用,然后不会创建新线程。

  1. With the rewrite I am not holding threads open when sleeping, but in reality is there any benefit of using this async call or just instead spin up a single thread and use a BlockingCollection and blocking on dequeue? I don't want to save one thread for sacrificing time taken to spin up new threads.

在您的程序中添加一个线程来为队列服务并不是那么糟糕。您只需创建一次,因此开销无关紧要。它确实为堆栈占用了 1 兆字节(默认情况下),但这通常也不会成为问题。

另一方面,由于使用了线程池,同样地调用 Task.Run() 也不太可能导致显着的开销。

所以对我来说,这归结为美观和可维护性。

我要指出,使用 BlockingCollection<T>BufferBlock<T> 的问题与您实施 OnEvent() 的问题有些不同。前者关注底层队列的实现,而后者关注事件实际出队时发生的情况。即使您使用 BlockingCollection<T>,如果您不更改 OnEvent(),您仍然会为每个事件启动一个新任务。相反,没有理由不能使 OnEvent() 运行 事件同步处理,即使使用 BufferBlock<T>.

队列代码显然期望事件被异步处理,但事件不一定是。这取决于队列的客户端。