如何锁定由nodejs中的多个异步方法共享的对象?

How to lock on object which shared by multiple async method in nodejs?

我在 nodejs 中有一个具有不同属性的对象,有不同的异步函数可以通过一些复杂的执行来访问和修改该对象。单个异步函数可能具有内部回调(或异步函数),这可能需要一些时间来执行,然后该函数将修改该对象。我想锁定那个对象直到我完成所有修改,只有在那之后任何其他异步函数才能访问它。

示例:

var machineList = {};

function operation1() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation2() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation3() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}
function operation4() {
    waitForMachineList();
    //complex logic
    //modification of machineList, some closure and callbacks functions and again modification of machineList
    leaveFromMachineList();
}

假设machineList是一个复杂的对象,通过不同的异步方法(operation1()operation2()、...)对该对象进行了不同的操作来修改它.根据来自客户端的请求,这些操作以任意顺序和任意次数调用。每个请求将执行单个操作。

每个操作函数中都有一些内部的闭包函数和回调(或异步函数),可能需要一些时间。但我想锁定 machineList 对象,直到完成任何单个操作。

在任何操作开始时,我想像 waitForMachineList() 一样锁定对象,并在 leaveFromMachineList() 后释放锁。

所以最后想在nodejs中实现锁机制。就像 C++ 中的 Critical Session 和 C# 中的锁定一样。

所以请一些人帮忙在nodejs中实现它?或建议我可以用于此的任何节点模块。

我已经使用 async-lock 节点模块完成了锁定。现在我可以实现上面提到的目标了。

示例:

var AsyncLock = require('async-lock');
var lock = new AsyncLock();

function operation1() {
    console.log("Execute operation1");
    lock.acquire("key1", function(done) {
        console.log("lock1 enter")
        setTimeout(function() {
            console.log("lock1 Done")
            done();
        }, 3000)
    }, function(err, ret) {
        console.log("lock1 release")
    }, {});
}

function operation2() {
    console.log("Execute operation2");
    lock.acquire("key1", function(done) {
        console.log("lock2 enter")
        setTimeout(function() {
            console.log("lock2 Done")
            done();
        }, 1000)
    }, function(err, ret) {
        console.log("lock2 release")
    }, {});
}

function operation3() {
    console.log("Execute operation3");
    lock.acquire("key1", function(done) {
        console.log("lock3 enter")
        setTimeout(function() {
            console.log("lock3 Done")
            done();
        }, 1)
    }, function(err, ret) {
        console.log("lock3 release")
    }, {});
}operation1(); operation2(); operation3();

输出:

执行操作1

锁1进入

执行操作2

执行操作3

锁定 1 完成

解锁1

锁2进入

lock2 完成

解锁2

锁3进入

lock3 完成

锁3释放

我写这段代码是为了解决类似的问题。

我认为这是一种经典的数据库事务模型。它只实现锁定机制,不实现回滚。它以下列方式工作:

  1. 您按调用顺序添加了一堆同步方法
  2. 这些方法将在 N 毫秒后被调用
  3. 只有一个获取数据库数据(或您需要修改的任何其他数据)的请求和一个保存数据库数据的请求
  4. 每个方法都会接收到“最新数据”的引用,即处理程序 № 2 将接收由处理程序 № 1 更改的数据对象。顺序取决于 № 的顺序1

它适用于浏览器。它应该在 Node.js 中工作,但我没有测试它。

注意,因为每个方法都会接收到对象的引用,你应该修改实际的引用,如果你想读取它和return一些值,然后复制它,不要return该引用,因为该引用的值将来可能会被未来的处理程序更改

TRANSACTION 将等待 N 毫秒(默认为 250)后执行。这定义了哪些方法将被分组到单个事务中。您还可以进行即时通话。

代码如下:

let TIMER_ID = 0;
let LOCK = Promise.resolve();
let RESOLVE_LOCK = undefined;
let FUNC_BUFFER = [];
let SUCCESS_BUFFER = [];
let FAIL_BUFFER = [];


/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}

/**
 * Sets new data in DB.
 */
async function set(value) {
    return;
}


/**
 * Classic database transaction implementation.
 *
 * You adding bunch of methods, every method will be
 * executed with valid instance of data at call time,
 * after all functions end the transaction will end.
 * If any method failed, then entire transaction will fail
 * and no changes will be written. If current transaction is
 * active, new one will be not started until end of previous one.
 *
 * In short, this transaction have ACID properties.
 *
 * Operations will be auto grouped into separate transactions
 * based on start timeout, which is recreated every time on
 * operation call.
 *
 * @example
 * ```
 * // Without transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 123 // because `update` is async
 *
 * // With transaction:
 * create("1", 123)
 * update("1", 321)
 * read("1") => 321
 *
 * // Without transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // "v" will be created
 * read("v") => true // because `update` is async
 * update("v", false) // will update because `update` is async
 *
 * // With transaction:
 * create("v", true)
 * update("v", false) // throws internal error,
 *                    // entire transaction will throw error,
 *                   // "v" will be not created
 * read("v") => true // entire transaction will throw error
 * update("v", false) // entire transaction will throw error
 * ```
 *
 * @example
 * ```
 * // Transaction start
 * create()
 * update()
 * update()
 * remove()
 * // Transaction end
 *
 * // Transaction start
 * create()
 * update()
 * sleep(1000)
 * // Transaction end
 *
 * // Transaction start
 * update()
 * remove()
 * // Transaction end
 * ```
 */
const TRANSACTION = {
    /**
     * Adds function in transaction.
     *
     * NOTE:
     * you shouldn't await this function, because
     * it only represents transcation lock, not
     * entire transaction end.
     *
     * @param f
     * Every function should only modify passed state, don't
     * reassign passed state and not save changes manually!
     * @param onSuccess
     * Will be called on entire transaction success.
     * @param onFail
     * Will be called on entire transaction fail.
     * @param startTimeout
     * Passed `f` will be added in current transaction,
     * and current transaction will be called after
     * `startTimeout` ms if there will be no more `f` passed.
     * Default value is recommended.
     */
    add: async function(
        f,
        onSuccess,
        onFail,
        startTimeout
    ) {
        await LOCK;

        window.clearTimeout(TIMER_ID);
        FUNC_BUFFER.push(f);

        if (onSuccess) {
            SUCCESS_BUFFER.push(onSuccess);
        }

        if (onFail) {
            FAIL_BUFFER.push(onFail);
        }

        if (startTimeout == null) {
            startTimeout = 250;
        }

        TIMER_ID = window.setTimeout(() => {
            TRANSACTION.start();
        }, startTimeout);

        console.debug("Added in transaction");
    },
    start: async function() {
        LOCK = new Promise((resolve) => {
            RESOLVE_LOCK = resolve;
        });
        console.debug("Transaction locked");

        let success = true;

        try {
            await TRANSACTION.run();
        } catch (error) {
            success = false;
            console.error(error);
            console.warn("Transaction failed");
        }

        if (success) {
            for (const onSuccess of SUCCESS_BUFFER) {
                try {
                    onSuccess();
                } catch (error) {
                    console.error(error);
                }
            }
        } else {
            for (const onFail of FAIL_BUFFER) {
                try {
                    onFail();
                } catch (error) {
                    console.error(error);
                }
            }
        }

        FUNC_BUFFER = [];
        SUCCESS_BUFFER = [];
        FAIL_BUFFER = [];
        RESOLVE_LOCK();

        console.debug("Transaction unlocked");
    },
    run: async function() {
        const data = await get();
        const state = {
            value1: data.key1,
            value2: data.key2
        };

        for (const f of FUNC_BUFFER) {
            console.debug("Transaction function started");
            f(state);
            console.debug("Transaction function ended");
        }

        await set({
            key1: state.value1,
            key2: state.value2
        });
    }
}

示例 № 1:

/**
 * Gets DB data. 
 */
async function get() {
    return {
        key1: "value1",
        key2: {
            key3: "value2"
        }
    };
}

/**
 * Sets new data in DB.
 */
async function set(value) {
    console.debug("Will be set:", value);

    return;
}


new Promise(
    (resolve) => {
        TRANSACTION.add(
            (data) => {
                data.value2.key3 = "test1";
            },
            () => console.debug("success № 1")
        );
        TRANSACTION.add(
            (data) => {
                const copy = {
                    ...data.value2
                };
                
                resolve(copy);
            },
            () => console.debug("success № 2")
        );
        TRANSACTION.add(
            (data) => {
                data.value1 = "test10";
                data.value2.key3 = "test2";
            },
            () => console.debug("success № 3")
        );
    }
)
.then((value) => {
    console.debug("result:", value);
});

/* Output:

Added in transaction
Added in transaction
Added in transaction
Transaction locked
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Transaction function started
Transaction function ended
Will be set: {key1: 'test10', key2: {key3: 'test2'}}
result: {key3: 'test1'}
success № 1
success № 2
success № 3
Transaction unlocked 

*/

示例 2:

TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);


/* Output:

16:15:34.715 Added in transaction
16:15:34.715 Added in transaction
16:15:34.717 Transaction locked
16:15:34.717 Transaction function started
16:15:34.718 1
16:15:34.718 Transaction function ended
16:15:34.718 Transaction function started
16:15:34.718 2
16:15:34.718 Transaction function ended
16:15:34.719 Transaction unlocked

*/

示例 3:

TRANSACTION.add(
    () => {
        console.log(1);
    }
);
TRANSACTION.add(
    () => {
        console.log(2);
    },
    undefined,
    undefined,
    0 // instant call
);

await new Promise((resolve) => {
    window.setTimeout(() => {
        resolve();
    }, 1000);
});

TRANSACTION.add(
    () => {
        console.log(3);
    }
);


/* Output:

16:19:56.840 Added in transaction
16:19:56.840 Added in transaction
16:19:56.841 Transaction locked
16:19:56.841 Transaction function started
16:19:56.842 1
16:19:56.842 Transaction function ended
16:19:56.842 Transaction function started
16:19:56.842 2
16:19:56.842 Transaction function ended
16:19:56.842 Transaction unlocked
16:19:57.840 Added in transaction
16:19:58.090 Transaction locked
16:19:58.090 Transaction function started
16:19:58.090 3
16:19:58.091 Transaction function ended
16:19:58.091 Transaction unlocked

*/