如何 Post 到 BufferBlock 并从 ActionBlock 获取结果?
How to Post to a BufferBlock and get a result from the ActionBlock?
有一个对象一次只能处理一个请求,处理它需要一点时间。任务完成后,它会引发一个事件 returning 结果。对象是以下代码中的 Computer
,假设我无法更改此 class.
的行为
现在,我想创建一个包装器 class 来给客户留下他们可以随时发送请求的印象。该请求现在是一个异步方法,因此客户端可以简单地等待直到结果 returned。当然,底层对象一次只能处理一个请求,因此包装器需要将请求排队,当“处理完成”事件到达时,它需要将结果 return 发送给相应的客户端。此包装器 class 在以下代码中是 SharedComputer
。
我想我需要 return 我从“Place1”获得的值在“Place2”。推荐的做法是什么? BufferBlock/ActionBlock 没有 return 值的机制吗?
static void Main(string[] args)
{
SharedComputer pc = new SharedComputer();
for(int i =0; i<10; i++)
{
Task.Factory.StartNew(async() =>
{
var r = new Random();
int randomDelay = r.Next(500, 5000);
Thread.Sleep(randomDelay);
int random1 = r.Next(0, 10);
int random2 = r.Next(0, 10);
int sum = await pc.Add(random1, random2);
if(random1 + random2 == sum)
{
Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
}
else
{
Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
}
});
}
System.Console.Read();
}
}
class SharedComputer
{
Computer Mainframe= Computer.GetInstance();
BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
TaskCompletionSource<int> TCS;
public SharedComputer()
{
Mainframe.Calculated += Mainframe_Calculated;
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
};
var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
{
Debug.WriteLine("Starting an adding");
TCS = new TaskCompletionSource<int>();
Mainframe.StartAdding(e.A, e.B);
var res = await TCS.Task; // Place1
Debug.WriteLine("Got the result.");
}, options);
JobQueue.LinkTo(jobProcessor);
}
private void Mainframe_Calculated(object sender, int e)
{
TCS.SetResult(e);
}
public async Task<int> Add(int a, int b)
{
var data = new TwoNumbers()
{
A = a,
B = b
};
Debug.WriteLine("Queuing a new adding.");
JobQueue.Post<TwoNumbers>(data);
return 1;//Place2: Return the value received at Place1.
}
struct TwoNumbers
{
public int A;
public int B;
}
}
class Computer
{
static Computer Instance;
bool IsWorking = false;
private Computer()
{
}
public static Computer GetInstance()
{
if (Instance == null)
Instance = new Computer();
return Instance;
}
public event EventHandler<int> Calculated;
public void StartAdding(int a, int b)
{
if (IsWorking)
{
throw new InvalidOperationException("Already working.");
}
IsWorking = true;
Task.Factory.StartNew(() =>
{
Thread.Sleep(3000);
IsWorking = false;
Calculated(this, a + b);
});
}
}
the underlying object can process one request at a time, so the wrapper needs to queue the request, and when the "processing done" event arrives, it needs to return the result to the appropriate client.
所以你需要的是互斥。虽然您 可以 从 TPL 数据流和 TaskCompletionSource<T>
构建互斥体,但使用内置互斥体 SemaphoreSlim
.
会容易得多
在我看来,先写一个异步抽象,然后再添加互斥会更干净。异步抽象 would look like:
public static class ComputerExtensions
{
public static Task<int> AddAsync(this Computer computer, int a, int b)
{
var tcs = new TaskCompletionSource<int>();
EventHandler<int> handler = null;
handler = (_, result) =>
{
computer.Calculated -= handler;
tcs.TrySetResult(result);
};
computer.Calculated += handler;
computer.StartAdding(a, b);
}
}
一旦你有了异步 API,你可以通过 SemaphoreSlim
:
轻松应用异步节流(或互斥)
class SharedComputer
{
Computer Mainframe = Computer.GetInstance();
readonly SemaphoreSlim _mutex = new();
public async Task<int> AddAsync(int a, int b)
{
await _mutex.WaitAsync();
try { return Mainframe.AddAsync(a, b); }
finally { _mutex.Release(); }
}
}
有一个对象一次只能处理一个请求,处理它需要一点时间。任务完成后,它会引发一个事件 returning 结果。对象是以下代码中的 Computer
,假设我无法更改此 class.
现在,我想创建一个包装器 class 来给客户留下他们可以随时发送请求的印象。该请求现在是一个异步方法,因此客户端可以简单地等待直到结果 returned。当然,底层对象一次只能处理一个请求,因此包装器需要将请求排队,当“处理完成”事件到达时,它需要将结果 return 发送给相应的客户端。此包装器 class 在以下代码中是 SharedComputer
。
我想我需要 return 我从“Place1”获得的值在“Place2”。推荐的做法是什么? BufferBlock/ActionBlock 没有 return 值的机制吗?
static void Main(string[] args)
{
SharedComputer pc = new SharedComputer();
for(int i =0; i<10; i++)
{
Task.Factory.StartNew(async() =>
{
var r = new Random();
int randomDelay = r.Next(500, 5000);
Thread.Sleep(randomDelay);
int random1 = r.Next(0, 10);
int random2 = r.Next(0, 10);
int sum = await pc.Add(random1, random2);
if(random1 + random2 == sum)
{
Debug.WriteLine($"Got correct answer: {random1} + {random2} = {sum}.");
}
else
{
Debug.WriteLine($"Got incorrect answer: {random1} + {random2} = {sum}.");
}
});
}
System.Console.Read();
}
}
class SharedComputer
{
Computer Mainframe= Computer.GetInstance();
BufferBlock<TwoNumbers> JobQueue = new BufferBlock<TwoNumbers>();
TaskCompletionSource<int> TCS;
public SharedComputer()
{
Mainframe.Calculated += Mainframe_Calculated;
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
};
var jobProcessor = new ActionBlock<TwoNumbers>(async e =>
{
Debug.WriteLine("Starting an adding");
TCS = new TaskCompletionSource<int>();
Mainframe.StartAdding(e.A, e.B);
var res = await TCS.Task; // Place1
Debug.WriteLine("Got the result.");
}, options);
JobQueue.LinkTo(jobProcessor);
}
private void Mainframe_Calculated(object sender, int e)
{
TCS.SetResult(e);
}
public async Task<int> Add(int a, int b)
{
var data = new TwoNumbers()
{
A = a,
B = b
};
Debug.WriteLine("Queuing a new adding.");
JobQueue.Post<TwoNumbers>(data);
return 1;//Place2: Return the value received at Place1.
}
struct TwoNumbers
{
public int A;
public int B;
}
}
class Computer
{
static Computer Instance;
bool IsWorking = false;
private Computer()
{
}
public static Computer GetInstance()
{
if (Instance == null)
Instance = new Computer();
return Instance;
}
public event EventHandler<int> Calculated;
public void StartAdding(int a, int b)
{
if (IsWorking)
{
throw new InvalidOperationException("Already working.");
}
IsWorking = true;
Task.Factory.StartNew(() =>
{
Thread.Sleep(3000);
IsWorking = false;
Calculated(this, a + b);
});
}
}
the underlying object can process one request at a time, so the wrapper needs to queue the request, and when the "processing done" event arrives, it needs to return the result to the appropriate client.
所以你需要的是互斥。虽然您 可以 从 TPL 数据流和 TaskCompletionSource<T>
构建互斥体,但使用内置互斥体 SemaphoreSlim
.
在我看来,先写一个异步抽象,然后再添加互斥会更干净。异步抽象 would look like:
public static class ComputerExtensions
{
public static Task<int> AddAsync(this Computer computer, int a, int b)
{
var tcs = new TaskCompletionSource<int>();
EventHandler<int> handler = null;
handler = (_, result) =>
{
computer.Calculated -= handler;
tcs.TrySetResult(result);
};
computer.Calculated += handler;
computer.StartAdding(a, b);
}
}
一旦你有了异步 API,你可以通过 SemaphoreSlim
:
class SharedComputer
{
Computer Mainframe = Computer.GetInstance();
readonly SemaphoreSlim _mutex = new();
public async Task<int> AddAsync(int a, int b)
{
await _mutex.WaitAsync();
try { return Mainframe.AddAsync(a, b); }
finally { _mutex.Release(); }
}
}