如何使用 C# Rx 实现 Redis 流
How to implement Redis streams with C# Rx
因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我遇到了几个问题,你们中的一些人可能会指出我正确的地方。
实施使用了 Pub/Sub 和流的组合:
* 日志 -> 流通道
* log:notification -> pub/sub
* log:lastReadMessage -> 包含流中最后读取的密钥
出版商
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
订户
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
}
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
为什么要信号量?
因为我可以将很多消息添加到流中,而且我不希望 o 对同一消息处理两次。
问题
如果我们在流中有未处理的消息,我们如何在没有来自 Pub/Sub 的事件的情况下进行处理
当我们开始时,我们可以验证它是否是未处理的消息并处理它。如果在此期间向流中添加了一条新消息,而我们尚未订阅 Pub/sub,订阅者将不会处理该消息,直到我们通过 Pub/Sub.[=13 收到通知。 =]
信号量对于不处理同一条消息两次很重要,但同时它也是一个诅咒。在一条消息的处理过程中,可以将另一条消息添加到流中。当这种情况发生时,订阅者不会立即处理,而是在下次收到通知时才处理(此时将处理两条消息)。
您将如何实施?
是否有仅使用 Rx 的 Redis 流的实现?
该解决方案不应该使用某种循环并且内存效率高。这可能吗?
祝福
保罗·阿博伊姆·平托
这是我想避免的 WHILE 解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(async obs =>
{
while(!cancellationToken.IsCancellationRequested)
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
await Task.Delay(TimeSpan.FromMilliseconds(500));
}
return Disposable.Empty;
});
}
这是另一个使用 200 毫秒运行时间的计时器的解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
var instance = ThreadPoolScheduler.Instance;
return Observable.Create<string>(obs =>
{
var disposable = Observable
.Interval(TimeSpan.FromMilliseconds(200), instance)
.Subscribe(async _ =>
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
});
cancellationToken.Register(() => disposable.Dispose());
return Disposable.Empty;
});
}
我使用紧密循环只是做一个 XRange 并保存一个位置 - KISS.. 但是如果没有工作它会后退所以当有很多事情发生在它的紧密循环中时它非常快。
如果您需要更高的性能,例如在处理时读取,但是在大多数情况下我会提醒您不要这样做。
- 它造成了很多复杂性,这需要坚如磐石。
- Redis 通常足够快
- “我不想让同一条消息被处理两次。”几乎每个系统都至少有一次交付消除这种崩溃是令人难以置信的困难/缓慢。您可以通过使用 id 的哈希集来部分删除它,但是对于消费者来说处理它和设计为幂等的消息是非常微不足道的。这可能是消息设计问题的根本原因。如果您对每个 reader 进行分区(单独的流和每个流 1 个工作人员),您可以将哈希集保留在内存中,避免缩放/分布式问题。请注意,Redis 流可以保留顺序,使用它来生成更简单的幂等消息。
- 异常,您不想停止处理流,因为消费者在 1 条消息上有逻辑异常,例如在晚上接到一个电话整个系统已经停止,锁使情况变得更糟。事件数据无法更改,它已发生,所以它会尽力而为。然而,infra/redis 异常确实需要抛出并重试。在循环外管理这个非常痛苦。
- 简单的背压。如果您不能足够快地处理工作,循环就会变慢,而不是创建大量任务并耗尽您的所有内存。
我不再使用分布式锁/信号量。
如果您处理命令(例如 dosomething 而不是 xyz),则这些操作可能会失败。同样,消费者应该处理已经发生的情况,而不是 redis / 流读取部分。
一些具有神奇回调的库不能解决这些问题,回调将在任何节点等超时 运行 时重试。复杂性/问题仍然存在,它们只是转移到其他地方。
您可能在消费者的顶部有一个 observable,但这基本上是装饰性的,它不能解决问题,如果您在某个地方查看许多实现,您会看到相同的循环。我不会用它来让消费者注册一个动作。
例如
public interface IStreamSubscriber
{
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
}
在你的情况下,回调可能有可观察的而不是使用循环,但下面有一个低级循环,它也可以为消费者进行消息到对象的转换。
因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我遇到了几个问题,你们中的一些人可能会指出我正确的地方。
实施使用了 Pub/Sub 和流的组合: * 日志 -> 流通道 * log:notification -> pub/sub * log:lastReadMessage -> 包含流中最后读取的密钥
出版商
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
订户
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
}
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
为什么要信号量?
因为我可以将很多消息添加到流中,而且我不希望 o 对同一消息处理两次。
问题
如果我们在流中有未处理的消息,我们如何在没有来自 Pub/Sub 的事件的情况下进行处理 当我们开始时,我们可以验证它是否是未处理的消息并处理它。如果在此期间向流中添加了一条新消息,而我们尚未订阅 Pub/sub,订阅者将不会处理该消息,直到我们通过 Pub/Sub.[=13 收到通知。 =]
信号量对于不处理同一条消息两次很重要,但同时它也是一个诅咒。在一条消息的处理过程中,可以将另一条消息添加到流中。当这种情况发生时,订阅者不会立即处理,而是在下次收到通知时才处理(此时将处理两条消息)。
您将如何实施? 是否有仅使用 Rx 的 Redis 流的实现? 该解决方案不应该使用某种循环并且内存效率高。这可能吗?
祝福
保罗·阿博伊姆·平托
这是我想避免的 WHILE 解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(async obs =>
{
while(!cancellationToken.IsCancellationRequested)
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
await Task.Delay(TimeSpan.FromMilliseconds(500));
}
return Disposable.Empty;
});
}
这是另一个使用 200 毫秒运行时间的计时器的解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
var instance = ThreadPoolScheduler.Instance;
return Observable.Create<string>(obs =>
{
var disposable = Observable
.Interval(TimeSpan.FromMilliseconds(200), instance)
.Subscribe(async _ =>
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
});
cancellationToken.Register(() => disposable.Dispose());
return Disposable.Empty;
});
}
我使用紧密循环只是做一个 XRange 并保存一个位置 - KISS.. 但是如果没有工作它会后退所以当有很多事情发生在它的紧密循环中时它非常快。
如果您需要更高的性能,例如在处理时读取,但是在大多数情况下我会提醒您不要这样做。
- 它造成了很多复杂性,这需要坚如磐石。
- Redis 通常足够快
- “我不想让同一条消息被处理两次。”几乎每个系统都至少有一次交付消除这种崩溃是令人难以置信的困难/缓慢。您可以通过使用 id 的哈希集来部分删除它,但是对于消费者来说处理它和设计为幂等的消息是非常微不足道的。这可能是消息设计问题的根本原因。如果您对每个 reader 进行分区(单独的流和每个流 1 个工作人员),您可以将哈希集保留在内存中,避免缩放/分布式问题。请注意,Redis 流可以保留顺序,使用它来生成更简单的幂等消息。
- 异常,您不想停止处理流,因为消费者在 1 条消息上有逻辑异常,例如在晚上接到一个电话整个系统已经停止,锁使情况变得更糟。事件数据无法更改,它已发生,所以它会尽力而为。然而,infra/redis 异常确实需要抛出并重试。在循环外管理这个非常痛苦。
- 简单的背压。如果您不能足够快地处理工作,循环就会变慢,而不是创建大量任务并耗尽您的所有内存。
我不再使用分布式锁/信号量。
如果您处理命令(例如 dosomething 而不是 xyz),则这些操作可能会失败。同样,消费者应该处理已经发生的情况,而不是 redis / 流读取部分。
一些具有神奇回调的库不能解决这些问题,回调将在任何节点等超时 运行 时重试。复杂性/问题仍然存在,它们只是转移到其他地方。
您可能在消费者的顶部有一个 observable,但这基本上是装饰性的,它不能解决问题,如果您在某个地方查看许多实现,您会看到相同的循环。我不会用它来让消费者注册一个动作。
例如
public interface IStreamSubscriber
{
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
}
在你的情况下,回调可能有可观察的而不是使用循环,但下面有一个低级循环,它也可以为消费者进行消息到对象的转换。