单生产者多消费者循环缓冲区
Single Producer Multiple Consumer Circular Buffer
在我当前的应用程序中,我通过光谱仪接收光谱数据。该数据累积一秒钟,然后放入循环缓冲区。现在我有一个消费者,他从缓冲区弹出条目,然后将所有内容保存到磁盘。好的,所有这些都有效。现在我需要做的是添加另一个消费者,在保存的同时,对光谱进行一些处理。所以我有两个消费者需要完全相同的数据(注意:他们只读取而不修改)。好的,但这不起作用,因为如果其中一个消费者弹出缓冲区的一个条目,它就消失了,所以另一个就不会收到它。我想这个问题最简单的解决方案是给每个消费者它自己的循环缓冲区。很好,但唯一的问题是:数据条目很大。一个条目的最大大小约为 80MB,因此为了节省内存,最好不要让相同的数据出现两次。有没有更好的解决方案?
注意:我使用的是循环缓冲区,因此可以确保缓冲区有一个增长限制。
我希望您能直接将数据接收到队列中,而不是将其复制太多....
任何保留单个数据副本的有效解决方案都必须同步所有消费者,这样只有当他们都完成了一个条目时,它才会被弹出。
您可以保留您的循环缓冲区。当读者完成后,您只需要一个 remover 即可删除条目。我强烈建议这个 remover 成为数据的 writer。这样,它将成为唯一对队列具有写权限的人,这简化了事情。
去除剂可以从消费者那里得到,告诉它他们做了什么。
消费者可以与删除器共享他们的读取偏移量。您可以在消费者端使用 atomic_store,在去除器端使用 atomic_load。
应该是这样的:
struct Consumer {
...
long offset = 0;
...
Consumer() {
q.remover->add(this);
}
...
void run() {
for(;;) {
entry& e = q.read( offset );
process( e );
atomic_store( &offest, offset + e.size() );
}
}
};
struct Remover {
...
long remove_offset = 0;
std::list<Consumer*> cons;
...
void remove() {
// find lowest read point
long cons_offset = MAX_LONG;
for( auto p : cons ) {
cons_offset = std::min( cons_offset, atomic_load(&p->offset) );
}
// remove up to that point
while( cons_offset > remove_offset ) {
entry& e = q.read(remove_offset);
remove_offset += e.size();
q.remove( e.size() );
}
}
};
在你的缓冲区中保留两个不同的尾指针,每个消费者一个。当生产者更新队列时,使用最远的尾指针(滞后的尾指针)检查缓冲区是否已满。消费者可以使用自己的尾指针来检查缓冲区是否为空。这样我们就得到了一个无锁缓冲区,并且没有数据的复制。
查看 disruptor exchange 的实施,了解有关使用此解决方案提高性能的讨论。
在我当前的应用程序中,我通过光谱仪接收光谱数据。该数据累积一秒钟,然后放入循环缓冲区。现在我有一个消费者,他从缓冲区弹出条目,然后将所有内容保存到磁盘。好的,所有这些都有效。现在我需要做的是添加另一个消费者,在保存的同时,对光谱进行一些处理。所以我有两个消费者需要完全相同的数据(注意:他们只读取而不修改)。好的,但这不起作用,因为如果其中一个消费者弹出缓冲区的一个条目,它就消失了,所以另一个就不会收到它。我想这个问题最简单的解决方案是给每个消费者它自己的循环缓冲区。很好,但唯一的问题是:数据条目很大。一个条目的最大大小约为 80MB,因此为了节省内存,最好不要让相同的数据出现两次。有没有更好的解决方案?
注意:我使用的是循环缓冲区,因此可以确保缓冲区有一个增长限制。
我希望您能直接将数据接收到队列中,而不是将其复制太多....
任何保留单个数据副本的有效解决方案都必须同步所有消费者,这样只有当他们都完成了一个条目时,它才会被弹出。
您可以保留您的循环缓冲区。当读者完成后,您只需要一个 remover 即可删除条目。我强烈建议这个 remover 成为数据的 writer。这样,它将成为唯一对队列具有写权限的人,这简化了事情。
去除剂可以从消费者那里得到,告诉它他们做了什么。
消费者可以与删除器共享他们的读取偏移量。您可以在消费者端使用 atomic_store,在去除器端使用 atomic_load。
应该是这样的:
struct Consumer {
...
long offset = 0;
...
Consumer() {
q.remover->add(this);
}
...
void run() {
for(;;) {
entry& e = q.read( offset );
process( e );
atomic_store( &offest, offset + e.size() );
}
}
};
struct Remover {
...
long remove_offset = 0;
std::list<Consumer*> cons;
...
void remove() {
// find lowest read point
long cons_offset = MAX_LONG;
for( auto p : cons ) {
cons_offset = std::min( cons_offset, atomic_load(&p->offset) );
}
// remove up to that point
while( cons_offset > remove_offset ) {
entry& e = q.read(remove_offset);
remove_offset += e.size();
q.remove( e.size() );
}
}
};
在你的缓冲区中保留两个不同的尾指针,每个消费者一个。当生产者更新队列时,使用最远的尾指针(滞后的尾指针)检查缓冲区是否已满。消费者可以使用自己的尾指针来检查缓冲区是否为空。这样我们就得到了一个无锁缓冲区,并且没有数据的复制。
查看 disruptor exchange 的实施,了解有关使用此解决方案提高性能的讨论。