gRPC 推送和扇出
gRPC Push and Fan-Out
是否可以将 gRPC 用作具有扇出功能的推送服务?
在 Google 给出的示例中,服务器端 (C#) 有以下代码:
public override async Task ListFeatures(Rectangle request, IServerStreamWriter<Feature> responseStream, ServerCallContext context)
{
var responses = features.FindAll( (feature) => feature.Exists() && request.Contains(feature.Location) );
foreach (var response in responses)
{
await responseStream.WriteAsync(response);
}
}
问题在于:
- 只有在客户端明确要求时才会生成和写入数据。
- 只有提出请求的客户才能获得新数据。
我觉得我需要的是:
- 为每个请求(订阅)的客户端保留所有 IServerStreamWriters。
- 当有新数据可用时,通过外部事件触发写入。
- 写入所有 streamWriters
编辑:
根据 Carl 的建议,我现在有以下几点:
原型:
service PubSub {
rpc Subscribe(Subscription) returns (stream Event) {}
rpc Unsubscribe(Subscription) returns (Unsubscription) {}
}
message Event
{
string Value = 1;
}
message Subscription
{
string Id = 1;
}
message Unsubscription
{
string Id = 1;
}
PubSubImpl:
public class PubSubImpl : PubSub.PubSubBase
{
private readonly BufferBlock<Event> _buffer = new BufferBlock<Event>();
private Dictionary<string, IServerStreamWriter<Event>> _subscriberWritersMap =
new Dictionary<string, IServerStreamWriter<Event>>();
public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
//Dict to hold a streamWriter for each subscriber.
_subscriberWritersMap[subscription.Id] = responseStream;
while (_subscriberWritersMap.ContainsKey(subscription.Id))
{
//Wait on BufferBlock from MS Dataflow package.
var @event = await _buffer.ReceiveAsync();
foreach (var serverStreamWriter in _subscriberWritersMap.Values)
{
await serverStreamWriter.WriteAsync(@event);
}
}
}
public override Task<Unsubscription> Unsubscribe(Subscription request, ServerCallContext context)
{
_subscriberWritersMap.Remove(request.Id);
return Task.FromResult(new Unsubscription() { Id = request.Id });
}
public void Publish(string input)
{
_buffer.Post(new Event() { Value = input });
}
}
"Push"现在可以这样发送:
while ((input = Console.ReadLine()) != "q")
{
pubsubImp.Publish(input);
}
在客户端我有:
public async Task Subscribe()
{
_subscription = new Subscription() { Id = Guid.NewGuid().ToString() };
using (var call = _pubSubClient.Subscribe(_subscription))
{
//Receive
var responseReaderTask = Task.Run(async () =>
{
while (await call.ResponseStream.MoveNext())
{
Console.WriteLine("Event received: " + call.ResponseStream.Current);
}
});
await responseReaderTask;
}
}
public void Unsubscribe()
{
_pubSubClient.Unsubscribe(_subscription);
}
Client-Main 是这样工作的:
static void Main(string[] args)
{
var channel = new Channel("127.0.0.1:50052",
ChannelCredentials.Insecure);
var subscriber = new Subsriber(new PubSub.PubSubClient(channel));
Task.Run(async () =>
{
await subscriber.Subscribe();
}).GetAwaiter();
Console.WriteLine("Hit key to unsubscribe");
Console.ReadLine();
subscriber.Unsubscribe();
Console.WriteLine("Unsubscribed...");
Console.WriteLine("Hit key to exit...");
Console.ReadLine();
}
目前它看起来有效。 should/could 是这样做的吗?
测试解决方案可在以下位置找到:
https://github.com/KingKnecht/gRPC-PubSub
这是可能的,但我不能评论这是否是好的做法。您需要像您所说的那样跟踪每个客户端,并确保在客户端断开连接变得无法访问时删除它们。
您的一般方法听起来不错。推送 RPC 应该是双向流式传输(假设每个客户端都可以引起推送)。当客户端连接到服务器时,将客户端记录在线程安全集合中。当其中一个客户端发送消息时,遍历集合将消息发送到每个连接的客户端。如果发送失败,则从客户端移除客户端并关闭连接。
在名为 "RouteGuide" 的示例中有一个更简单的版本,它以 gRPC 支持的语言实现了一个简单的聊天服务器。
我认为您不需要跟踪 server-app 中的客户。相反,对于每个 "push" 方法,创建一个 EventWaitHandle
并持续等待它。然后在另一个上下文中,向 EventWaitHandle
发出信号,以便等待的 "push" 方法可以 WriteAsync
到客户端。
例如
public class PubSubImpl : PubSub.PubSubBase
{
EventWaitHandle evwStatus = new EventWaitHandle(false, EventResetMode.ManualReset);
string status;
public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
// respond with the current status
await serverStreamWriter.WriteAsync(new Event { Value = status });
// wait until we're signaled with a different status
while(evwStatus.WaitOne())
{
await serverStreamWriter.WriteAsync(new Event { Value = status });
}
}
public void Publish(string input)
{
status = input;
// let the waiting threads respond
evwStatus.Set();
// halt the waiting threads
evwStatus.Reset();
}
}
是否可以将 gRPC 用作具有扇出功能的推送服务? 在 Google 给出的示例中,服务器端 (C#) 有以下代码:
public override async Task ListFeatures(Rectangle request, IServerStreamWriter<Feature> responseStream, ServerCallContext context)
{
var responses = features.FindAll( (feature) => feature.Exists() && request.Contains(feature.Location) );
foreach (var response in responses)
{
await responseStream.WriteAsync(response);
}
}
问题在于:
- 只有在客户端明确要求时才会生成和写入数据。
- 只有提出请求的客户才能获得新数据。
我觉得我需要的是:
- 为每个请求(订阅)的客户端保留所有 IServerStreamWriters。
- 当有新数据可用时,通过外部事件触发写入。
- 写入所有 streamWriters
编辑: 根据 Carl 的建议,我现在有以下几点:
原型:
service PubSub {
rpc Subscribe(Subscription) returns (stream Event) {}
rpc Unsubscribe(Subscription) returns (Unsubscription) {}
}
message Event
{
string Value = 1;
}
message Subscription
{
string Id = 1;
}
message Unsubscription
{
string Id = 1;
}
PubSubImpl:
public class PubSubImpl : PubSub.PubSubBase
{
private readonly BufferBlock<Event> _buffer = new BufferBlock<Event>();
private Dictionary<string, IServerStreamWriter<Event>> _subscriberWritersMap =
new Dictionary<string, IServerStreamWriter<Event>>();
public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
//Dict to hold a streamWriter for each subscriber.
_subscriberWritersMap[subscription.Id] = responseStream;
while (_subscriberWritersMap.ContainsKey(subscription.Id))
{
//Wait on BufferBlock from MS Dataflow package.
var @event = await _buffer.ReceiveAsync();
foreach (var serverStreamWriter in _subscriberWritersMap.Values)
{
await serverStreamWriter.WriteAsync(@event);
}
}
}
public override Task<Unsubscription> Unsubscribe(Subscription request, ServerCallContext context)
{
_subscriberWritersMap.Remove(request.Id);
return Task.FromResult(new Unsubscription() { Id = request.Id });
}
public void Publish(string input)
{
_buffer.Post(new Event() { Value = input });
}
}
"Push"现在可以这样发送:
while ((input = Console.ReadLine()) != "q")
{
pubsubImp.Publish(input);
}
在客户端我有:
public async Task Subscribe()
{
_subscription = new Subscription() { Id = Guid.NewGuid().ToString() };
using (var call = _pubSubClient.Subscribe(_subscription))
{
//Receive
var responseReaderTask = Task.Run(async () =>
{
while (await call.ResponseStream.MoveNext())
{
Console.WriteLine("Event received: " + call.ResponseStream.Current);
}
});
await responseReaderTask;
}
}
public void Unsubscribe()
{
_pubSubClient.Unsubscribe(_subscription);
}
Client-Main 是这样工作的:
static void Main(string[] args)
{
var channel = new Channel("127.0.0.1:50052",
ChannelCredentials.Insecure);
var subscriber = new Subsriber(new PubSub.PubSubClient(channel));
Task.Run(async () =>
{
await subscriber.Subscribe();
}).GetAwaiter();
Console.WriteLine("Hit key to unsubscribe");
Console.ReadLine();
subscriber.Unsubscribe();
Console.WriteLine("Unsubscribed...");
Console.WriteLine("Hit key to exit...");
Console.ReadLine();
}
目前它看起来有效。 should/could 是这样做的吗? 测试解决方案可在以下位置找到: https://github.com/KingKnecht/gRPC-PubSub
这是可能的,但我不能评论这是否是好的做法。您需要像您所说的那样跟踪每个客户端,并确保在客户端断开连接变得无法访问时删除它们。
您的一般方法听起来不错。推送 RPC 应该是双向流式传输(假设每个客户端都可以引起推送)。当客户端连接到服务器时,将客户端记录在线程安全集合中。当其中一个客户端发送消息时,遍历集合将消息发送到每个连接的客户端。如果发送失败,则从客户端移除客户端并关闭连接。
在名为 "RouteGuide" 的示例中有一个更简单的版本,它以 gRPC 支持的语言实现了一个简单的聊天服务器。
我认为您不需要跟踪 server-app 中的客户。相反,对于每个 "push" 方法,创建一个 EventWaitHandle
并持续等待它。然后在另一个上下文中,向 EventWaitHandle
发出信号,以便等待的 "push" 方法可以 WriteAsync
到客户端。
例如
public class PubSubImpl : PubSub.PubSubBase
{
EventWaitHandle evwStatus = new EventWaitHandle(false, EventResetMode.ManualReset);
string status;
public override async Task Subscribe(Subscription subscription, IServerStreamWriter<Event> responseStream, ServerCallContext context)
{
// respond with the current status
await serverStreamWriter.WriteAsync(new Event { Value = status });
// wait until we're signaled with a different status
while(evwStatus.WaitOne())
{
await serverStreamWriter.WriteAsync(new Event { Value = status });
}
}
public void Publish(string input)
{
status = input;
// let the waiting threads respond
evwStatus.Set();
// halt the waiting threads
evwStatus.Reset();
}
}