具有并发性的漏桶算法

Leaky bucket algorithm with concurrency

尝试模拟多个线程正在创建流量以填充桶和一个线程以指定速率泄漏桶的场景。但是,代码 运行 陷入死锁。 你能复习一下这段代码吗?如果您看到任何错误和我应该添加的最佳修改,请告诉我。

代码

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <chrono>

using namespace std;

class LeakyBucket {
public:
    LeakyBucket(int size, int rate) : maxCapacity(size), leakRate(rate), filled(0)  {}
    void add(int newDataSize) {
        unique_lock<mutex> lk(_mtx);
        _cond.wait(lk, [this](){
           return  filled<=maxCapacity;
        });


        filled = (filled+newDataSize) > maxCapacity ? maxCapacity:(filled+newDataSize);
        cout<<"\n Filled bucket with : "<<newDataSize;
        cout<<"\n Filled: "<<filled<<"\n ----------";
        _cond.notify_one();
    }

    void leak() {
        while(1) {
            {
                unique_lock<mutex> lk(_mtx);
            _cond.wait(lk, [this]() {
                return filled > 0 || _done;
            });
            if(_done)
                break;

            filled = (filled-leakRate<0) ? 0 : (filled-leakRate);
            cout << "\n Leaked bucket with leakRate";
            cout << "\n BucketFilledRemain: " << filled << "\n ----------";
            _cond.notify_one();
            }
            _sleep:
            this_thread::sleep_for(chrono::seconds(1));
        }
    }

    bool _done = false;
private:
    atomic<int> filled;
    int maxCapacity;
    int leakRate; // Per second
    mutex _mtx;
    condition_variable _cond;

};

void runLeakyBucketAlgorithm() {
    LeakyBucket *lb = new LeakyBucket(30, 20);

    thread t1(&LeakyBucket::leak, lb);
    thread t2([&](){
       for(int i=0; i<10; i++) {
           cout<<"\n launching thread: "<<i;
           lb->add(rand()%40);
       }
       this_thread::sleep_for(chrono::seconds(5));
       lb->_done = true;
    });
    if(t2.joinable()) {
       t2.join();
    }

    t1.join();
}

O/p:

 launching thread: 0
 Filled bucket with : 7
 Filled: 7
 ----------
 launching thread: 1
 Filled bucket with : 9
 Filled: 16
 ----------
 launching thread: 2
 Leaked bucket with leakRate
 BucketFilledRemain: 0
 ----------
 Filled bucket with : 33
 Filled: 30
 ----------
 launching thread: 3
 Filled bucket with : 18
 Filled: 30
 ----------
 launching thread: 4
 Filled bucket with : 10
 Filled: 30
 ----------
 launching thread: 5
 Filled bucket with : 32
 Filled: 30
 ----------
 launching thread: 6
 Filled bucket with : 24
 Filled: 30
 ----------
 launching thread: 7
 Filled bucket with : 38
 Filled: 30
 ----------
 launching thread: 8
 Filled bucket with : 3
 Filled: 30
 ----------
 launching thread: 9
 Filled bucket with : 29
 Filled: 30
 ----------
 Leaked bucket with leakRate
 BucketFilledRemain: 10
 ----------
 Leaked bucket with leakRate
 BucketFilledRemain: 0

显示的代码中存在多个基本错误。

thread t1(&LeakyBucket::leak, lb);

leak() 将等到桶的填充率至少为 0,然后从中减去泄漏率。然后它就会完成。而已。不会了。泄漏线程将不复存在。它将变成 ex-thread。它将永远渴望峡湾。水桶漏过一次,漏水的洞就堵上了,完全变成了leak-proof个水桶。

 new LeakyBucket(30, 20);

桶的容量是 30,漏率是 20。

lb->add(rand()%40);

这会被调用十次,添加 0 到 39 滴水。

所以,假设我们第一次将 20 滴水滴入桶中。泄漏线程将苏醒,取出那 20 滴水,然后 well-deserved 退休。

等等,我们还有九次加水!

第二次调用add()会掉落25滴水。第三次尝试添加 30 滴水。存储桶现在已超出容量。对 add() 的第四次调用现在将永远阻塞,因为正如我们刚刚看到的,桶现在完全 leak-proof。

这是第一个错误:桶漏一次,然后就不再漏了。

            _cond.wait(lk, [this]() {
                return filled > 0;
            });

            filled -= leakRate;

桶中的漏水会等到桶中至少有 1 滴水时,然后漏出 20 滴水。所以,如果桶里已经有五滴水,那么在这一切之后,桶里会有负的十五滴水。这显然没有意义,所以这将是第二个需要修复的错误,然后才能正常工作。

这里可能还有第三个错误。桶被定义为具有一定的容量。然而,在我上面的一个例子中,水桶最终含有比其规定容量更多的水滴。这也没有加起来。