如何使用 node.js 中的承诺进行同步 http 调用

how to make synchronous http calls using promises in node.js

我想遍历一组学生并为他们每个人进行 http 调用并解析响应并插入 mongodb,所以我想对每个学生一个一个地执行此操作直到插入所有数据,然后继续下一个数据,这样 CPU 和 RAM 内存会更好...

到目前为止我正在这样做,但出于某种原因这不是我想要的...

var startDate = new Date("February 20, 2016 00:00:00");  //Start from February
var from = new Date(startDate).getTime() / 1000;
startDate.setDate(startDate.getDate() + 30);
var to = new Date(startDate).getTime() / 1000;

iterateThruAllStudents(from, to);

function iterateThruAllStudents(from, to) {
    Student.find({status: 'student'})
        .populate('user')
        .exec(function (err, students) {
            if (err) {
                throw err;
            }

            async.eachSeries(students, function iteratee(student, callback) {
                if (student.worksnap.user != null) {
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {
                            'Authorization': 'Basic xxx='
                        },
                        method: 'GET'
                    };

                    promisedRequest(worksnapOptions)
                        .then(function (response) { //callback invoked on deferred.resolve
                            parser.parseString(response, function (err, results) {
                                var json_string = JSON.stringify(results.time_entries);
                                var timeEntries = JSON.parse(json_string);
                                _.forEach(timeEntries, function (timeEntry) {
                                    _.forEach(timeEntry, function (item) {
                                        saveTimeEntry(item);
                                    });
                                });
                                callback(null);
                            });
                        }, function (newsError) { //callback invoked on deferred.reject
                            console.log(newsError);
                        });
                }
            });
        });
}

function saveTimeEntry(item) {
    Student.findOne({
            'worksnap.user.user_id': item.user_id[0]
        })
        .populate('user')
        .exec(function (err, student) {
            if (err) {
                throw err;
            }
            student.timeEntries.push(item);
            student.save(function (err) {
                if (err) {
                    console.log(err);
                } else {
                    console.log(Math.random());
                }
            });

        });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

似乎同时为所有学生调用了 .then 中的 saveEntry(),这似乎有问题。

我是 Javascript 的新手,尤其是在承诺、回调方面... 任何人都有实现这样的想法......

好的,我确实解决了这个问题,如果以后有人遇到这个问题,这里是我的解决方案。

    var startDate = new Date("February 20, 2016 00:00:00");  //Start from February
var from = new Date(startDate).getTime() / 1000;
startDate.setDate(startDate.getDate() + 30);
var to = new Date(startDate).getTime() / 1000;

iterateThruAllStudents(from, to);

function iterateThruAllStudents(from, to) {
    Student.find({status: 'student'})
        .populate('user')
        .exec(function (err, students) {
            if (err) {
                throw err;
            }

            var counter = 1;
            async.eachSeries(students, function iteratee(student, callback) {
                counter++;
                if (student.worksnap.user != null) {
                    console.log('');
                    console.log('--------------');
                    console.log(student.worksnap.user.user_id);
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {
                            'Authorization': 'Basic xxxxx'
                        },
                        method: 'GET'
                    };

                    promisedRequest(worksnapOptions)
                        .then(function (response) { //callback invoked on deferred.resolve
                            parser.parseString(response, function (err, results) {
                                var json_string = JSON.stringify(results.time_entries);
                                var timeEntries = JSON.parse(json_string);
                                var isEmpty = _.isEmpty(timeEntries); // true
                                if (isEmpty) {
                                    callback(null);
                                }
                                saveTimeEntry(timeEntries).then(function (response) {
                                    console.log('all timeEntries for one student finished....Student: ' + student.worksnap.user.user_id + ' Student Counter: ' + counter);
                                    callback(null);
                                });
                            });
                        }, function (newsError) { //callback invoked on deferred.reject
                            console.log(newsError);
                        });
                } else {
                    callback(null);
                }
            });
        });
}

function saveTimeEntry(timeEntries) {
    var deferred = Q.defer();
    _.forEach(timeEntries, function (timeEntry) {
        _.forEach(timeEntry, function (item) {
            Student.findOne({
                    'worksnap.user.user_id': item.user_id[0]
                })
                .populate('user')
                .exec(function (err, student) {
                    if (err) {
                        //throw err;
                        console.log(err);
                    }
                    student.timeEntries.push(item);
                    student.save(function (err) {
                        if (err) {
                            console.log(err);
                            deferred.reject(err);
                        } else {
                            //console.log(Math.random());
                        }
                    });

                });
        });
        deferred.resolve('finished saving timeEntries for one student...');
    });

    return deferred.promise;
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        console.log('inside On error.');
        console.log(err);
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

首先,如果您对所有异步操作使用承诺,那么多个嵌套操作将更容易编码和可靠地处理错误。这意味着学习如何使用数据库中内置的承诺(我假设您使用的是猫鼬),然后包装任何其他异步操作以使用承诺。这里有几个关于在 mongoose 中使用 promises 的链接:

Switching to use promises in Mongoose

Plugging in your own promise library into Mongoose

因此,查看 Mongoose 文档,您可以看到 .exec().save() 已经 return 承诺,因此我们可以直接使用它们。

添加这行代码将告诉 Mongoose 使用 Q promises(因为这是您展示的 promise 库):

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

然后,你需要promisify一些不使用promise的操作,比如解析步骤:

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

saveTimeEntry() 可以很容易地写入 return 承诺,只需使用数据库中已有的承诺支持:

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

所以,现在您已经具备使用 promises 重写主要逻辑的所有正确部分。这里要记住的关键是,如果您 return 来自 .then() 处理程序的承诺,它会将承诺链接到父承诺。我们将在您的处理中大量使用它。此外,对遍历数组的承诺进行排序的常见设计模式是像这样使用 array.reduce()

return array.reduce(function(p, item) {
    return p.then(function() {
         return someAsyncPromiseOperation(item);
    });
}, Q());

我们将在几个地方使用该结构来使用 promises 重写核心逻辑:

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

iterateThruAllStudents(from, to).then(function() {
    // done successfully here
}, function(err) {
    // error occurred here
});

function iterateThruAllStudents(from, to, callback) {
    return Student.find({status: 'student'}).populate('user').exec().then(function (students) {
        // iterate through the students array sequentially
        students.reduce(function(p, student) {
            return p.then(function() {
                if (student.worksnap.user != null) {
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + 
                              '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {'Authorization': 'Basic xxx='},
                        method: 'GET'
                    };

                    return promisedRequest(worksnapOptions).then(function(response) {
                        return parse(response).then(function(results) {
                            // assuming results.time_entries is an array
                            return results.time_entries.reduce(function(p, item) {
                                return p.then(function() {
                                    return saveTimeEntry(item);
                                });
                            }, Q());
                        });
                    });
                }
            });
        }, Q())
    });
}

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    return deferred.promise;
}

这提供了您的代码没有的一件事是,发生的任何错误都会一直渗透到 iterateThruAllStudents() 的 returned 承诺,因此不会隐藏任何错误。


然后清理它以减少嵌套缩进并添加一个有用的实用函数,它看起来像这样:

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

// create utility function for promise sequencing through an array
function sequence(array, iterator) {
    return array.reduce(function(p, item) {
        return p.then(function() {
            return iterator(item);
        });
    }, Q());
}

iterateThruAllStudents(from, to).then(function() {
    // done successfully here
}, function(err) {
    // error occurred here
});

function iterateThruAllStudents(from, to, callback) {
    return Student.find({status: 'student'}).populate('user').exec().then(function (students) {
        // iterate through the students array sequentially
        return sequence(students, function(item) {
            if (student.worksnap.user != null) {
                var worksnapOptions = {
                    hostname: 'worksnaps.com',
                    path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + 
                          '&from_timestamp=' + from + '&to_timestamp=' + to,
                    headers: {'Authorization': 'Basic xxx='},
                    method: 'GET'
                };
                return promisedRequest(worksnapOptions).then(function(response) {
                    return parse(response);
                }).then(function(results) {
                    // assuming results.time_entries is an array
                    return sequence(results.time_entries, saveTimeEntry);
                });
            }
        });
    });
}

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

当然,这是很多代码,我没有办法测试它,而且我以前从未写过 Mongoose 代码,所以这里很可能有一些错误,但希望你能看到一般的想法并解决我可能犯的任何错误。