如何在特定线程池上分派 C# 异步函数。 (提供了 Kotlin 示例)

How to dispatch C# async function on specific ThreadPool. (Kotlin example provided)

我需要使用 async/await 在 C# 上翻译这段 Kotlin 代码。 我特别感兴趣如何创建线程池并在其上执行异步方法。

val myThreadPool1 = newFixedThreadPoolContext(1, "myThreadPool1")
val myThreadPool2 = newFixedThreadPoolContext(1, "myThreadPool2")

fun main() = runBlocking {
    f1()
    f2()
}

suspend fun f1() {
    withContext(myThreadPool1) {
        delay(2000)
        println(Thread.currentThread().name)
    }
}

suspend fun f2() {
    withContext(myThreadPool2) {
        delay(1000)
        println(Thread.currentThread().name)
    }
}

我已经为 you/anyone 制作了一个关于如何使用 SynchronizationContext 在特定线程上继续等待方法的示例。我不知道你的 C# 经验,但这不是初学者水平。

在此示例中,您可以看到两个异步方法 "interrupting" 彼此。

我为此创建了一个要点(有点不同)ASyncThread.cs

您可以将此作为参考..

class Program
{
    static async void First()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine($"{DateTime.Now}| First on Thread: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(1000);
        }
    }

    static async void Second()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine($"{DateTime.Now}| Second on Thread: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(500);
        }
    }

    static void Main(string[] args)
    {
        Console.WriteLine($"Hello World! on Thread: {Thread.CurrentThread.ManagedThreadId}");

        using (var myThread = new AwaitEnabledThread())
        {
            myThread.Post(First);
            myThread.Post(Second);

            Console.ReadLine();
        }
    }
}

这将导致:

Hello World! on thread: 1                           
18-10-2019 12:14:34| First on Thread: 4             
18-10-2019 12:14:34| Second on Thread: 4            
18-10-2019 12:14:35| Second on Thread: 4            
18-10-2019 12:14:35| First on Thread: 4             
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:37| First on Thread: 4             
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:38| First on Thread: 4             
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:39| Second on Thread: 4            
18-10-2019 12:14:39| First on Thread: 4             

public class AwaitEnabledThread : SynchronizationContext, IDisposable
{
    // By JvanLangen.
    private class ActionWithState
    {
        public SendOrPostCallback Action { get; set; }
        public object State { get; set; }
    }

    private Task _mainTask;
    private int _mainTaskThreadId;
    private readonly ManualResetEvent _terminate = new ManualResetEvent(false);
    private readonly AutoResetEvent _actionAdded = new AutoResetEvent(false);
    private readonly ConcurrentQueue<ActionWithState> _actions = new ConcurrentQueue<ActionWithState>();

    private void TaskMethod()
    {
        // because this class derives from SynchronizationContext
        SynchronizationContext.SetSynchronizationContext(this); // <-------

        _mainTaskThreadId = Thread.CurrentThread.ManagedThreadId;

        var waitHandles = new WaitHandle[] { _terminate, _actionAdded };

        while (WaitHandle.WaitAny(waitHandles) != 0)
            while (_actions.TryDequeue(out var actionWithState))
                actionWithState.Action(actionWithState.State);
    }


    public AwaitEnabledThread()
    {
        _mainTask = Task.Factory.StartNew(TaskMethod, TaskCreationOptions.LongRunning);
    }

    public override void Post(SendOrPostCallback d, object state = null)
    {
        _actions.Enqueue(new ActionWithState { Action = d, State = state });
        _actionAdded.Set();
    }

    public void Post(Action action)
    {
        Post(s => action(), null);
    }

    public override void Send(SendOrPostCallback d, object state = null)
    {
        if (Thread.CurrentThread.ManagedThreadId != _mainTaskThreadId)
        {
            _actions.Enqueue(new ActionWithState { Action = d, State = state });
            _actionAdded.Set();
        }
        else
            d(state);
    }

    public void Send(Action action)
    {
        Send(s => action(), null);
    }

    public void Dispose()
    {
        _terminate.Set();
        _mainTask.Wait();
    }

    public bool Terminated => _terminate.WaitOne(0);
}

更新:您甚至可以等待排队到线程的操作:

public static Task SendASync(this AwaitEnabledThread thread, SendOrPostCallback d, object state = null)
{
    var tcs = new TaskCompletionSource<object>();

    thread.Post(s =>
    {
        try
        {
            // execute the delegate
            d(state);
            // return to the previous SynchronizationContext
            tcs.SetResult(null);
        }
        catch (Exception exception)
        {
            // return to the previous SynchronizationContext
            tcs.SetException(exception);
        }
    }, tcs);

    return tcs.Task;
}

用法:

using (var myThread = new AwaitEnabledThread())
{
    await myThread.SendASync(First);

    // even awaitable actions can be awaited
    await myThread.SendASync(async s =>
    {
        await DoSomethingIO();
        await DoSomethingIOAgain();
    });


    Console.ReadLine();
}

不同的是,第一个 await 影响当前线程。方法中的await是在目标线程上完成的。