将相同数据发送到多个动态进程的有效方式

Efficient way of sending the same data to multiple dynamic processes

我有一个行缓冲数据流,还有很多来自其他进程的读者
读者需要动态附加到系统,写入流的进程不知道他们

首先我尝试读取每一行并将它们简单地发送到很多管道

#writer
command | while read -r line; do
    printf '%s\n' "$line" | tee listeners/*
done

#reader
mkfifo listeners/1
cat listeners/1

但这会消耗很多 CPU

所以我想写一个文件并反复清理它

#writer
command >> file &
while true; do
    : > file
    sleep 1
done

#reader
tail -f -n0 file

但有时,一行在截断之前未被一个或多个读者读取,从而造成竞争条件
有没有更好的方法来实现这个?

对我来说听起来像 pub/sub - 参见 Wikipedia

基本上,新的感兴趣的人会随时出现并 "subscribe" 到您的频道。接收数据的进程然后 "publishes" 逐行发送到该通道。

您可以使用 mosquittoRedis 通过 MQTT 来完成。两者都有命令行 interfaces/bindings,以及 Python、C/C++、Ruby、PHP 等。客户端和服务器不必在同一台机器上,一些客户端可能在网络上的其他位置。

Mosquitto 示例 here.


我用 Redis pub/sub 在 Mac 上做了一些测试。终端中用于订阅名为 myStream 的频道的客户端代码如下所示:

redis-cli SUBSCRIBE myStream

然后我 运行 合成 10,000 行的过程如下:

time seq 10000  | while read a ; do redis-cli PUBLISH myStream "$a" >/dev/null 2>&1 ; done

这需要 40 秒,所以它每秒执行大约 250 行,但它必须为每一行启动一个全新的进程,并创建和断开与 Redis 的连接......我们不想让你的 CPU 发疯。

那么更适合您的情况,下面是如何创建一个包含 100,000 行的文件,一次阅读它们,并将它们发送给 Python 中的所有订阅者:

# Make a "BigFile" with 100,000 lines
seq 100000 > BigFile

并阅读这些行并发布它们:

#!/usr/bin/env python3

import redis

if __name__ == '__main__':
    # Redis connection
    r = redis.Redis(host='localhost', port=6379, db=0)

    # Read file line by line...
    with open('BigFile', 'r') as infile:
        for line in infile:
            # Publish the current line to subscribers
            r.publish('myStream', line)

全部 100,000 行在 4 秒内发送和接收,因此每秒 25,000 行。这是它的一些实际记录。在顶部你可以看到 CPU 并没有受到它的过度困扰。从顶部数第二个 window 是客户端,接收 100,000 行,下一个 window 是第二个客户端。底部 window 显示服务器 运行 上面的 Python 代码并在 4 秒内发送所有 100,000 行。

关键字:Redis、mosquitto、pub/sub、发布、订阅。