具有来自 reader 的只读访问权限的循环缓冲区

circular buffer with read only access from reader

我想在 reader 只有只读访问权限的地方构建循环缓冲区时遇到问题。为了实现平滑的翻转,我让作者将翻转数据结构的 iterator+1 中的一个 id 设置为 0,我用 reader 签入。在第一次翻转之前,我的算法似乎工作正常,然后由于某种原因,resder 将从 writer 显然设置的 id 中读取 0。 我有一些可编译的示例代码来演示这里的问题:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT     = 0x02,
    BFD_CLR     = 0x03,
    LOS_ACT     = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
}alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t *res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;
int main (void)
{
    int i =0;
    alarm_t dat;
    pthread_t reader;
    int ret;

    roller = calloc(NUM_ALM,sizeof(alarm_t));
    printf("allocated memory: %lukB\n",(sizeof(alarm_t)*NUM_ALM)/1024);

    for (i = 1; i< NUM_ALM; i++){
        alarm_add(LOS_ACT,i,0);
    }
    ret = pthread_create(&reader,NULL,alarm_reader,NULL);
    if (ret){
        printf("Error - pthread_create() return code: %d\n",ret);
        return ERROR;
    }
    sleep(1);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_ACT,8,0);

    pthread_join(reader,NULL);
}

void *alarm_reader(void *arg)
{
    static alarm_t dat={0};
    int err = 0;
    while(err <= 2)
    {
        if (next_alarm_read(&dat)== OK)
            printf("read alarm id %d, arg1 %d,arg2 %d\n",dat.alarmid,dat.arg1,dat.arg2);
        else{
            printf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");
}
int alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat={0};
    if (i<NUM_ALM){
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]){
            memcpy(&roller[i],&dat,sizeof(alarm_t));
            if (i+1<NUM_ALM)
                roller[i+1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            printf("added id %d, arg1 %d, arg2 %d @%d\n",roller[i].alarmid,roller[i].arg1,roller[i].arg2,i);
            i++;
        }
    } else {
        i = 0;
    }
    return 0;
}

int next_alarm_read(alarm_t *res)
{
    static int i = 0;
    static long prev_time = 0;
    if (!res)
        return ERROR;

    if (i<NUM_ALM)
    {
        if (roller[i].alarmid!=0){
            printf("next_alarm_read() reading @%d\n",i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            prev_time = roller[i].timestamp;
            i++;
        } else {
            printf("next_alarm_read() @%d is %d,return ERROR\n",i,roller[i].alarmid);

            return ERROR;
        }
    } else {
        i = 0;
    }
    return OK;
}

输出看起来像:

added id 12, arg1 1, arg2 0 @0
added id 12, arg1 2, arg2 0 @1
added id 12, arg1 3, arg2 0 @2
added id 12, arg1 4, arg2 0 @3
next_alarm_read() reading @0
read alarm id 12, arg1 1,arg2 0
next_alarm_read() reading @1
read alarm id 12, arg1 2,arg2 0
next_alarm_read() reading @2
read alarm id 12, arg1 3,arg2 0
next_alarm_read() reading @3
read alarm id 12, arg1 4,arg2 0
next_alarm_read() @4 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait
added id 2, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
added id 2, arg1 8, arg2 0 @1
added id 3, arg1 8, arg2 0 @2
added id 3, arg1 8, arg2 0 @3
added id 3, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
next_alarm_read() reading @4
read alarm id 3, arg1 8,arg2 0
read alarm id 3, arg1 8,arg2 0
next_alarm_read() reading @0
read alarm id 2, arg1 8,arg2 0
next_alarm_read() @1 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait

next_alarm_read() @1 is 0,return ERROR 的底部打印错误,id 应该是 2。我想知道为什么这不能按预期工作?

几个问题:

我不确定 if (&roller[i]) 应该是什么 do/mean。

main 中的 sleep 并不是真正需要的,我怀疑这是为了改善下面的其他问题。

alarm_add 将在翻转点删除一个条目。

此外,它可能会覆盖运行 reader 并在 reader 可以看到条目之前覆盖条目(即竞争条件)。

reader 和 writer 都需要查看彼此的当前队列索引(即它们不应该是函数范围 static)以防止 overrun/race

应该有两个个条件变量而不是只有一个:

  1. 编写器检测到队列已满,需要阻塞直到 reader 耗尽一个条目
  2. reader 检测到一个空队列,需要阻塞直到作者添加新条目

这是您的代码的重构版本,应该可以解决这些问题。我添加了一些调试代码。它可能并不完美[并且可能会偏向于保守主义],但它应该会让你更进一步[请原谅无偿的风格清理]:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    tvnow = tvgetf();
    printf("[%.9f] %s\n",tvnow,reason);
}

int
main(void)
{
    int i = 0;
    pthread_t reader;
    int ret;

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    for (i = 1; i < NUM_ALM; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_ACT, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_CLR, 8, 0);
    alarm_add(BFD_ACT, 8, 0);

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };

    while (1) {
        if (next_alarm_read(&dat))
            break;
        printf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            printf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            printf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

更新:

I don't understand the do while(0); "loops" in the two Q functions, can you elaboratea little, please?

do while(0) 是我经常用来代替 if/else 梯形逻辑的技术。它不是我发明的[它在一些风格指南中进行了讨论,特别是 "Code Complete"],但我向它展示过的很多人似乎都喜欢它。请参阅我的回答: 以获得更好的解释。

And I guessx what my initrial post didn't include is: the master should be able to enqueue things on an ongoing basis, there's no stopall and the reader should start reading as soon as something is available.

实际上,我确实意识到了这一点,而且我发布的代码允许这样做。

您可能希望在 之前发出 pthread_create 排队任何消息以防止我在代码注释中提到的潜在死锁。

A fix for this would be to remove stopall, the pthread_cond-signal() (from main) is already done inside alarm_add() so this should work fine.

stopall 与 overflow/underflow 同步。仅当作者(主线程)希望 receiver/thread 完成并干净地停止时。它更像是一种将 "EOF" 条件发送到 reader.

的方法

如果你的申请是运行"forever",你可以去掉stopall.

或者,一种更简洁的方式来发送信号 "EOF":主线程可以将一条特殊的 "stop" 消息(例如,时间戳为 -1 的消息)加入队列,以告诉接收方没有更多消息将 ever 发送,我们希望终止程序。


我建议你添加一个"diagnostic mode"来验证你的程序:

main 执行 pthread_create 然后执行:

    for (i = 1; i < 10000000; i++) {
        alarm_add(LOS_ACT, i, 0);
    }

reader 应该检查进来的 arg1 值。他们应该 increment 如上所述。如果他们不这样做,则存在逻辑错误或竞争条件。


这是我的代码的更新版本,带有 diagnostic/unit 测试模式的 -D 选项。请注意,所有打印都被禁用以允许它以极快的速度 运行:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

int opt_diag;
double tvzero;

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};

typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;

void alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// reader variables
pthread_cond_t cv_notempty;             // writer signals when queue not empty
volatile int need_notempty;             // reader sets this before waiting
volatile int idxdeq;                    // reader's queue index

// writer variables
pthread_cond_t cv_notfull;              // reader signals when queue not full
volatile int need_notfull;              // writer sets this before waiting
volatile int idxenq;                    // writer's queue index

volatile int stopall;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);

    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

#define DBG(_reason) \
    dbg(_reason)

void
dbg(const char *reason)
{
    double tvnow;

    if (! opt_diag) {
        tvnow = tvgetf();
        printf("[%.9f] %s\n",tvnow,reason);
    }
}

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    tvzero = tvgetf();

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    // NOTE: queuing more than a full queue here will cause writer to block
    // forever because reader is not yet started
    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

#if 0
    sleep(1);
#endif

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    // tell reader that all items are queued and it should stop when it
    // processes the final item
    pthread_mutex_lock(&mutex);
    stopall = 1;
    if (need_notempty)
        pthread_cond_signal(&cv_notempty);
    pthread_mutex_unlock(&mutex);

    pthread_join(reader, NULL);

    return 0;
}

// RETURNS: queue index to process (-1=empty)
int
queue_notempty(void)
{
    int curidx;

    do {
        curidx = idxdeq;

        // queue is empty
        if (curidx == idxenq) {
            curidx = -1;
            break;
        }

        // advance dequeue index
        idxdeq += 1;
        idxdeq %= NUM_ALM;
    } while (0);

    return curidx;
}

// RETURNS: queue index to use (-1=full)
int
queue_notfull(void)
{
    int nxtidx;
    int curidx;

    do {
        // get current index
        curidx = idxenq;

        // advance to next slot (wrapping if necessary)
        nxtidx = curidx;
        nxtidx += 1;
        nxtidx %= NUM_ALM;

        // queue is full
        if (nxtidx == idxdeq) {
            curidx = -1;
            break;
        }

        // store back adjusted index
        idxenq = nxtidx;
    } while (0);

    return curidx;
}

void *
alarm_reader(void *arg)
{
    alarm_t dat = { 0 };
    static int expval = 1;

    while (1) {
        if (next_alarm_read(&dat))
            break;

        if (opt_diag) {
            if (dat.arg1 != expval) {
                printf("expected: %d got %d\n",expval,dat.arg1);
                exit(1);
            }
            ++expval;
        }

        prtf("read alarm id %d, arg1 %d,arg2 %d\n",
            dat.alarmid, dat.arg1, dat.arg2);
    }

    printf("alarm_reader exit!\n");

    return (void *) 0;
}

void
alarm_add(int id, int arg1, int arg2)
{
    int curidx;
    alarm_t *rol;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notfull();

        // have an open slot -- store item into it
        if (curidx >= 0) {
            rol = &roller[curidx];

            rol->timestamp = time(NULL);
            rol->alarmid = id;
            rol->arg1 = arg1;
            rol->arg2 = arg2;

            prtf("added id %d, arg1 %d, arg2 %d @%d\n",
                rol->alarmid, rol->arg1, rol->arg2, curidx);

            // unblock reader if necessary
            if (need_notempty) {
                DBG("writer signal notempty");
                need_notempty = 0;
                pthread_cond_signal(&cv_notempty);
            }

            break;
        }

        // queue is full -- wait for reader to free up some space
        DBG("writer need_notfull");
        need_notfull = 1;
        pthread_cond_wait(&cv_notfull,&mutex);
        DBG("writer wakeup");
    }

    pthread_mutex_unlock(&mutex);
}

// RETURNS: 1=stop, 0=normal
int
next_alarm_read(alarm_t *res)
{
    //static long prev_time = 0;
    int curidx;
    alarm_t *rol;
    int stopflg = 0;

    pthread_mutex_lock(&mutex);

    while (1) {
        curidx = queue_notempty();

        // queue has an entry -- process it
        if (curidx >= 0) {
            rol = &roller[curidx];

            prtf("next_alarm_read() reading @%d\n", curidx);
            *res = *rol;
            //prev_time = rol->timestamp;

            // if writer is waiting/blocking, wake it up because we just
            // freed up a queue slot
            if (need_notfull) {
                DBG("reader signal notfull");
                need_notfull = 0;
                pthread_cond_signal(&cv_notfull);
            }

            break;
        }

        // stop when master has enqueued everything
        stopflg = stopall;
        if (stopflg)
            break;

        // queue is empty -- we must wait for writer to add something
        DBG("reader need_notempty");
        need_notempty = 1;
        pthread_cond_wait(&cv_notempty,&mutex);
    }

    pthread_mutex_unlock(&mutex);

    return stopflg;
}

这是添加了诊断选项的原始代码版本:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

int opt_diag;

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT = 0x02,
    BFD_CLR = 0x03,
    LOS_ACT = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
} alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t * res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;

#define prtf(_fmt...) \
    do { \
        if (opt_diag) \
            break; \
        printf(_fmt); \
    } while (0)

int
main(int argc,char **argv)
{
    int i = 0;
    char *cp;
    pthread_t reader;
    int ret;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'D':
            cp += 2;
            opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
            break;
        }
    }

    roller = calloc(NUM_ALM, sizeof(alarm_t));
    printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);

    if (! opt_diag) {
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }

    ret = pthread_create(&reader, NULL, alarm_reader, NULL);
    if (ret) {
        printf("Error - pthread_create() return code: %d\n", ret);
        return ERROR;
    }

    if (opt_diag) {
        for (i = 1; i < opt_diag; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    }
    else {
        sleep(1);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    }

    pthread_join(reader, NULL);
}

void *
alarm_reader(void *arg)
{
    static alarm_t dat = { 0 };
    int expval = 1;
    int err = 0;

    while (err <= 2) {
        if (next_alarm_read(&dat) == OK) {
            prtf("read alarm id %d, arg1 %d,arg2 %d\n", dat.alarmid, dat.arg1, dat.arg2);
            if (opt_diag) {
                if (dat.arg1 != expval) {
                    printf("expected: %d got %d\n",expval,dat.arg1);
                    exit(1);
                }
                ++expval;
            }
        }
        else {
            prtf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");

    return (void *) 0;
}

int
alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat = { 0 };
    if (i < NUM_ALM) {
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]) {
            memcpy(&roller[i], &dat, sizeof(alarm_t));
            if (i + 1 < NUM_ALM)
                roller[i + 1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            prtf("added id %d, arg1 %d, arg2 %d @%d\n", roller[i].alarmid, roller[i].arg1, roller[i].arg2, i);
            i++;
        }
    }
    else {
        i = 0;
    }
    return 0;
}

int
next_alarm_read(alarm_t * res)
{
    static int i = 0;
    //static long prev_time = 0;

    if (!res)
        return ERROR;

    if (i < NUM_ALM) {
        if (roller[i].alarmid != 0) {
            prtf("next_alarm_read() reading @%d\n", i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            //prev_time = roller[i].timestamp;
            i++;
        }
        else {
            prtf("next_alarm_read() @%d is %d,return ERROR\n", i, roller[i].alarmid);

            return ERROR;
        }
    }
    else {
        i = 0;
    }
    return OK;
}