Javascript 等待更改局部变量?

Javascript Await Changes Local Variables?

谁能解释一下我在 Javascript 中使用异步函数做错了什么?

基本上,我必须在我的 Node.js 代码中使用异步来获取一个开放端口供我使用。在异步调用之外设置了一个局部变量,我可以 access/use 就好了,直到我等待异步函数到 return。之后,局部变量未定义。

(async () => {
    console.log("CHECK AFTER ASYNC1: " + csvFilePath);
    // First, grab a valid open port
    var port;
    while (!port || portsInProcess.indexOf(port) >= 0) {
        console.log("CHECK AFTER ASYNC2: " + csvFilePath);
        port = await getPort();
        console.log(port);
    }
    console.log("CHECK AFTER ASYNC3: " + csvFilePath);
    portsInProcess.push(port);
    // ... more code below...

检查 #1 和 2 对于 csvFilePath 变量没有问题,但检查 #3 显示它未定义。但是,端口号没问题。这让我相信 Javascript 中的异步函数调用有些奇怪,它只影响局部变量;我进一步使用的全局变量就可以了。不幸的是,我不能使 csvFilePath 变量成为全局变量,因为这也会在该变量上引入竞争条件(我在其他地方防止这种情况发生;while 循环是为了帮助防止端口号上的竞争条件,这在我的简单测试中基本上没有使用过)在本地主机上)。

为了以防万一,这是我得到的输出:

CHECK AFTER ASYNC1: data/text/crescent_topics.csv
CHECK AFTER ASYNC2: data/text/crescent_topics.csv
58562
CHECK AFTER ASYNC3: null

可能还值得一提的是,我添加的代码行实际上只是动态获取开放端口的前几行代码。我之前使用固定端口号的代码工作得很好(包括这个保持稳定的 csvFilePath 变量)。

我对 await 功能的理解是它使异步函数或多或少地同步运行,这似乎就是这里发生的事情;在设置端口号之前,我使用端口号的代码不是 运行 。 (但即使情况并非如此,为什么 csvFilePath 变量被取消设置,因为我没有改变它或在这里以任何方式使用它?)

编辑:这里有更多代码来提供额外的上下文

var spawn = require('child_process').spawn;
var fs = require("fs");
var async = require('async');
var zmq = require('zmq');
var readline = require('readline');
const getPort = require('get-port');

/* Export the Nebula class */
module.exports = Nebula;

/* Location of the data for the Crescent dataset */
var textDataPath = "data/text/";
var crescentRawDataPath = textDataPath + "crescent_raw";
var crescentTFIDF = textDataPath + "crescent tfidf.csv";
var crescentTopicModel = textDataPath + "crescent_topics.csv";

/* Location of the data for the UK Health dataset */
var ukHealthRawDataPath = textDataPath + "uk_health_raw";
var ukHealthTFIDF = textDataPath + "uk_health.csv";

/* Map CSV files for text data to raw text location */
var textRawDataMappings = {};
textRawDataMappings[crescentTFIDF] = crescentRawDataPath;
textRawDataMappings[crescentTopicModel] = crescentRawDataPath;
textRawDataMappings[ukHealthTFIDF] = ukHealthRawDataPath;
textRawDataMappings[textDataPath + "uk_health_sm.csv"] = ukHealthRawDataPath;

/* The pipelines available to use */
var flatTextUIs = ["cosmos", "composite", "sirius", "centaurus"];
var pipelines = {
    andromeda: { 
        file: "pipelines/andromeda.py",
        defaultData: "data/highD/Animal_Data_study.csv"
     },
     cosmos: {
        file: "pipelines/cosmos.py",
        defaultData: textDataPath + "crescent tfidf.csv"
     },
     sirius: {
        file: "pipelines/sirius.py",
        defaultData: "data/highD/Animal_Data_paper.csv"
     },
     centaurus: {
        file: "pipelines/centaurus.py",
        defaultData: "data/highD/Animal_Data_paper.csv"
     },
     twitter: {
        file: "pipelines/twitter.py",
     },
     composite: {
        file: "pipelines/composite.py",
        defaultData: textDataPath + "crescent tfidf.csv"
     },
     elasticsearch: {
        file: "pipelines/espipeline.py",
        args: []
     }
};

/* The locations of the different types of datasets on the server */
var textDataFolder = "data/text/";
var highDDataFolder = "data/highD/";
var customCSVFolder = "data/customCSV/";

var sirius_prototype = 2;

// An array to track the ports being processed to eliminate race conditions
// as much as possible
var portsInProcess = [];

var nextSessionNumber = 0;
var usedSessionNumbers = [];

/* Nebula class constructor */
function Nebula(io, pipelineAddr) {
    /* This allows you to use "Nebula(obj)" as well as "new Nebula(obj)" */
    if (!(this instanceof Nebula)) { 
        return new Nebula(io);
    }

    /* The group of rooms currently active, each with a string identifier
     * Each room represents an instance of a visualization that can be shared
     * among clients.
     */
    this.rooms = {};
    this.io = io;

    /* For proper use in callback functions */
    var self = this;

    /* Accept new WebSocket clients */
    io.on('connection', function(socket) {

    // Skipped some irrelevant Socket.io callbacks

    **// Use the csvFilePath to store the name of a user-defined CSV file
        var csvFilePath = null;**

        /* Helper function to tell the client that the CSV file is now ready for them
        * to use. They are also sent a copy of the data
        */
        var csvFileReady = function(csvFilePath) {

            // Let the client know that the CSV file is now ready to be used on
            // the server
            socket.emit("csvDataReady");

            // Prepare to parse the CSV file
            var csvData = [];
            const rl = readline.createInterface({
                input: fs.createReadStream(csvFilePath),
                crlfDelay: Infinity
            });

            // Print any error messages we encounter
            rl.on('error', function (err) {
                console.log("Error while parsing CSV file: " + csvFilePath);
                console.log(err);
            });

            // Read each line of the CSV file one at a time and parse it
            var columnHeaders = [];
            var firstColumnName;
            rl.on('line', function (data) {                
                var dataColumns = data.split(",");

                // If we haven't saved any column names yet, do so first
                if (columnHeaders.length == 0) {
                    columnHeaders = dataColumns;
                    firstColumnName = columnHeaders[0];
                }

                // Process each individual line of data in the CSV file
                else {
                    var dataObj = {};
                    var i;
                    for (i = 0; i < dataColumns.length; i++) {
                        var key = columnHeaders[i];
                        var value = dataColumns[i];
                        dataObj[key] = value
                    }
                    csvData.push(dataObj);
                }

            });

            // All lines are read, file is closed now.
            rl.on('close', function () {

                // On certain OSs, like Windows, an extra, blank line may be read
                // Check for this and remove it if it exists
                var lastObservation = csvData[csvData.length-1];
                var lastObservationKeys = Object.keys(lastObservation);
                if (lastObservationKeys.length = 1 && lastObservation[lastObservationKeys[0]] == "") {
                    csvData.pop();
                }

                // Provide the CSV data to the client
                socket.emit("csvDataReadComplete", csvData, firstColumnName);
            });
        };

        **/* Allows the client to specify a CSV file already on the server to use */
        socket.on("setCSV", function(csvName) {
            console.log("setCSV CALLED");
            csvFilePath = "data/" + csvName;
            csvFileReady(csvFilePath);
            console.log("CSV FILE SET: " + csvFilePath);
        });**

        // Skipped some more irrelevant callbacks

        /*  a client/ a room. If the room doesn't next exist yet,
        * initiate it and send the new room to the client. Otherwise, send
        * the client the current state of the room.
        */
        socket.on('join', function(roomName, user, pipeline, args) {
            console.log("Join called for " + pipeline + " pipeline; room " + roomName);
            socket.roomName = roomName;
            socket.user = user;
            socket.join(roomName);

            console.log("CSV FILE PATH: " + csvFilePath);

            var pipelineArgsCopy = [];

            if (!self.rooms[roomName]) {
                var room = {};
                room.name = roomName;
                room.count = 1;
                room.points = new Map();
                room.similarity_weights = new Map();

                if (pipeline == "sirius" || pipeline == "centaurus") {
                    room.attribute_points = new Map();
                    room.attribute_similarity_weights = new Map();
                    room.observation_data = [];
                    room.attribute_data = [];
                }

                /* Create a pipeline client for this room */
                console.log("CHECK BEFORE ASYNC: " + csvFilePath);
                **// Here's the code snippet I provided above**
                **(async () => {
                    console.log("CHECK AFTER ASYNC1: " + csvFilePath);
                    // First, grab a valid open port
                    var port;
                    while (!port || portsInProcess.indexOf(port) >= 0) {
                        console.log("CHECK AFTER ASYNC2: " + csvFilePath);
                        port = await getPort();
                        console.log(port);
                    }
                    console.log("CHECK AFTER ASYNC3: " + csvFilePath);**
                    portsInProcess.push(port);
                    console.log("CHECK AFTER ASYNC4: " + csvFilePath);

                    if (!pipelineAddr) {
                        var pythonArgs = ["-u"];
                        if (pipeline in pipelines) {

                            // A CSV file path should have already been set. This
                            // file path should be used to indicate where to find
                            // the desired file
                            console.log("LAST CHECK: " + csvFilePath);
                            if (!csvFilePath) {
                                csvFilePath = pipelines[pipeline].defaultData;
                            }
                            console.log("FINAL CSV FILE: " + csvFilePath);
                            pipelineArgsCopy.push(csvFilePath);

                            // If the UI supports reading flat text files, tell the
                            // pipeline where to find the files
                            if (flatTextUIs.indexOf(pipeline) >= 0) {
                                pipelineArgsCopy.push(textRawDataMappings[csvFilePath]);
                            }

                            // Set the remaining pipeline args
                            pythonArgs.push(pipelines[pipeline].file);
                            pythonArgs.push(port.toString());
                            if (pipeline != "twitter" && pipeline != "elasticsearch") {
                                pythonArgs = pythonArgs.concat(pipelineArgsCopy);
                            }
                        }
                        else {
                            pythonArgs.push(pipelines.cosmos.file);
                            pythonArgs.push(port.toString());
                            pythonArgs.push(pipelines.cosmos.defaultData);
                            pythonArgs.push(crescentRawDataPath);
                        }

                        // used in case of CosmosRadar
                        for (var key in args) {
                            if (args.hasOwnProperty(key)) {
                                pythonArgs.push("--" + key);
                                pythonArgs.push(args[key]);
                            }
                        }

                        // Dynamically determine which distance function should be
                        // used
                        if (pythonArgs.indexOf("--dist_func") < 0) {
                            if (pipeline === "twitter" || pipeline === "elasticsearch" ||
                                    csvFilePath.startsWith(textDataPath)) {
                                pythonArgs.push("--dist_func", "cosine");
                            }
                            else {
                                pythonArgs.push("--dist_func", "euclidean");
                            }
                        }

                        console.log(pythonArgs);
                        console.log("");

                        var pipelineInstance = spawn("python2.7", pythonArgs, {stdout: "inherit"});

                        pipelineInstance.on("error", function(err) {
                            console.log("python2.7.exe not found. Trying python.exe");
                            pipelineInstance = spawn("python", pythonArgs,{stdout: "inherit"});

                            pipelineInstance.stdout.on("data", function(data) {
                                console.log("Pipeline: " + data.toString());
                            });
                            pipelineInstance.stderr.on("data", function(data) {
                                console.log("Pipeline error: " + data.toString());
                            });
                        });

                        /* Data received by node app from python process, 
                         * ouptut this data to output stream(on 'data'), 
                         * we want to convert that received data into a string and 
                         * append it to the overall data String
                         */
                        pipelineInstance.stdout.on("data", function(data) {
                            console.log("Pipeline STDOUT: " + data.toString());
                        });
                        pipelineInstance.stderr.on("data", function(data) {
                            console.log("Pipeline error: " + data.toString());
                        });

                        room.pipelineInstance = pipelineInstance;
                    }

                    /* Connect to the pipeline */
                    pipelineAddr = pipelineAddr || "tcp://127.0.0.1:" + port.toString();

                    room.pipelineSocket = zmq.socket('pair');
                    room.pipelineSocket.connect(pipelineAddr);

                    pipelineAddr = null;
                    portsInProcess.splice(portsInProcess.indexOf(port), 1);

                    /* Listens for messages from the pipeline */
                    room.pipelineSocket.on('message', function (msg) {
                        self.handleMessage(room, msg);
                    });

                    self.rooms[roomName] = socket.room = room;
                    invoke(room.pipelineSocket, "reset");
                })();
            }
            else {
                socket.room = self.rooms[roomName];
                socket.room.count += 1;

                if (pipeline == "sirius" || pipeline == "centaurus") {
                    socket.emit('update', sendRoom(socket.room, true), true);
                    socket.emit('update', sendRoom(socket.room, false), false);
                }
                else {
                    socket.emit('update', sendRoom(socket.room));
                }
            }

            // Reset the csvFilePath to null for future UIs...
            // I don't think this is actually necessary since 
            // csvFilePath is local to the "connections" message,
            // which is called for every individual room
            csvFilePath = null;
        });

        // Skipped the rest of the code; it's irrelevant
    });
}

完整打印输出:

setCSV CALLED
CSV FILE SET: data/text/crescent_topics.csv
Join called for sirius pipeline; room sirius0
CSV FILE PATH: data/text/crescent_topics.csv
CHECK BEFORE ASYNC: data/text/crescent_topics.csv
CHECK AFTER ASYNC1: data/text/crescent_topics.csv
CHECK AFTER ASYNC2: data/text/crescent_topics.csv
58562
CHECK AFTER ASYNC3: null
CHECK AFTER ASYNC4: null
LAST CHECK: null
FINAL CSV FILE: data/highD/Animal_Data_paper.csv
[ '-u',
  'pipelines/sirius.py',
  '58562',
  'data/highD/Animal_Data_paper.csv',
  undefined,
  '--dist_func',
  'euclidean' ]

由于加粗代码不起作用,只需搜索“**”即可找到我标记的相关部分。

TL;DR 客户端和服务器之间发生了大量通信,以建立直接链接到特定数据集的个性化通信。用户可以将自定义 CSV 文件上传到系统,但我现在使用的代码只是试图 select 服务器上现有的 CSV 文件,所以我省略了自定义的回调CSV 文件。一旦文件被 selected,客户端要求 "join" 一个 room/session。我现在正在处理的案例假设这是一个新的 room/session,而不是尝试与另一个客户共享 room/session。 (是的,我知道,共享 rooms/sessions 的代码很乱,但它现在大部分都可以工作,这不是我主要关心的问题。)同样,在添加异步代码之前,所有这些代码都运行良好(并使用静态端口变量),所以我不知道添加它后发生了什么变化。

经过一些愚蠢的测试后,我意识到我在异步调用之外将 csvFilePath 重置为 null,这就是导致错误的原因...糟糕!

由于您现在包含了整个代码上下文,我们可以看出问题是 async IIFE 之后的代码导致了问题。

一个 async 函数 returns 一旦它命中 await 一个承诺。并且,当 await 正在等待其异步操作时,调用 async 函数后的代码 运行s。在你的情况下,你基本上是这样做的:

var csvFilePath = someGoodValue;

(async () => {
     port = await getPort();
     console.log(csvFilePath);    // this will be null
})();

csvFilePath = null;               // this runs as soon as the above code hits the await

因此,一旦您点击第一个 awaitasync 函数 returns 一个承诺,它后面的代码将继续 运行,点击该行重置 csvFilePath.

的代码

可能有更简洁的方法来重构您的代码,但您可以做的一件简单的事情是:

var csvFilePath = someGoodValue;

(async () => {
     port = await getPort();
     console.log(csvFilePath);    // this will be null
})().finally(() => {
    csvFilePath = null;
});

注意:节点 v10+ 支持 .finally()。如果您使用的是旧版本,则可以在 .then().catch().

中重置路径

或者,正如您的评论所说,也许您可​​以完全删除 csvFilePath 的重置。