可读流和回调函数

Readable stream and callback function

我在从 cassandra 读取流时遇到了一些问题(我什至不知道我是否有可能实现我想要实现的目标)。所以在 cassandra-driver git 存储库页面上有一个如何使用流的示例。我试过了,很有效。

但是我正在尝试使用来自 ES6 提案的所谓 classes 和 nodejs 5。我想将模型定义为 class 并且在我使用流的方法之一中(我从 cassandra 获取数据的地方)。 问题出在 readable 状态,回调函数中有 this.read() ,现在当它在 class 中调用时,它变成了 class 范围所以它总是未定义的。我试过用 cassandra-driver 模块的 ResultStream 扩展我的 class 但没有运气,也许我没有正确调用它。我已经尝试使用 data 状态(不同的 class 和方法作为回调)并且它正在工作,因为数据状态有一个参数作为块传递。

所以问题是,我如何将这个流调用封装在 class 方法中,以便可以读取可读状态?

我想要实现的示例代码:

class Foobar {
    constructor(client) {
        this.client = client;
        this.collection = [];
        this.error;
    }
    getByProductName(query, params) {
        this.client.stream(query, params, {prepare: true})
            .on('readable', () => {
                var row;
                while(row = this.read()) { // Problem with this scope
                    this.collection.push(row);
                }
            })
            .on('error', err => {
                if(err) {
                    this.error = err;
                }
            })
            .on('end', () => {
                console.log('end');
            });
    }
}

感谢您的任何建议。

您可以在闭包中捕获 stream 实例:

class Foobar {
    constructor(client) {
        this.client = client;
        this.collection = [];
        this.error;
    }
    getByProductName(query, params) {
        const stream = this.client.stream(query, params, { prepare: true })
            .on('readable', () => {
                var row;
                while(row = stream.read()) { // <- use stream instance 
                    this.collection.push(row);
                }
            })
            .on('error', err => {
                if(err) {
                    this.error = err;
                }
            })
            .on('end', () => {
                console.log('end');
            });
    }
}