如何使用nodejs锁定mongodb中的阅读

how to lock reading in mongodb with nodejs

史前

我有一个 nodejs 服务器,它的实例在多台机器上 运行 并且每个实例每天同时运行一次 cron 作业。

当一个实例 运行 或刚刚完成其作业时,另一个实例应跳过作业内部的执行逻辑。

我已经有 mongodb 连接,所以我决定将运行作业的状态及其时间保存到数据库,并 check/change 在每个作业的回调中保存它。 我选择在集合中保存作业状态的文档模型是:

interface JobDao {
 _id: ObjectId;
 type: string;
 state: 'locked' | 'failed' | 'completed';
 updatedDate: DateISO;
}

我使用包 "mongodb": "^3.6.3" 进行查询。

经过一些尝试,我想知道我是否可以实现下面描述的行为。也许有人可以建议另一种解决方案来同步多台机器的 运行 作业。

所以我尝试实施解决方案并寻求帮助:

但这就是我遇到的问题。因为没有并发。在一台机器上获取和更新文档之间,另一台机器可以获取或更新不相关数据的陈旧文档。

我尝试过以下解决方案:

但没有任何效果。我开始考虑更改数据库,但也许有人可以说如何使用 mongodb 来实现它,或者可以推荐一些更合适的解决方案?

我建议您使用某种锁定机制在多个服务之间进行同步。 如果您希望关键部分中只有一项服务 write/read ,则可以使用基本互斥锁。 我不确定你到底想要什么,以防一个服务试图读取而另一个服务执行一些更改(等待、跳过或其他)。 您可以使用一些共享组件,例如redis来存储锁定密钥。

稍作休息后,我决定从头开始,终于找到了解决办法。 这是我的代码示例。它并不完美,所以我打算重构它,但它可以解决我的问题!希望对某人有所帮助。

略述其逻辑

“中介”是 public 方法 scheduleJob。逻辑顺序:

  • 当我们安排作业时,它会为数据库中的类型创建新文档(如果它不存在)。
  • 解锁过时的作业(如果已锁定超过半小时,则过时)。 服务器可能会在 运行 作业时崩溃,这会导致无限锁定作业,但检查过时的作业应该会有所帮助
  • 下一步是锁定未锁定的作业,否则,完成逻辑。 一个实例在下一个实例开始之前完成作业是可能的,所以如果同一作业在最后 5 分钟内 运行,我添加了作业的完成。重要的是这种条件限制了频率,因为作业不能每 5 分钟运行一次,但在我的情况下它是合适的解决方案

CronJobDaoCronJobModel是一样的,表示DB中的文档:

export interface CronJobDao {
  type: CronJobTypeEnum;
  isLocked: boolean;
  updatedAt: Date;
  completedAt: Date;
}

使用 scheduleJob 方法的服务:

import { inject, injectable } from 'inversify';
import { Job, scheduleJob } from 'node-schedule';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobRepository } from './cron-job.repository';


@injectable()
export class CronJobService {
  readonly halfHourMs = 30 * 60 * 1000;
  readonly fiveMinutesMs = 5 * 60 * 1000;

  constructor(
    @inject(CronJobRepository) private cronJobRepository: CronJobRepository,
  ) {}

  scheduleJob(type: CronJobTypeEnum, timeRule: string, callback: Function): Job {
    this.cronJobRepository.registerJob(type).then();

    return scheduleJob(
      type,
      timeRule,
      async () => {
        await this.unlockStaleJob(type);

        const lockedJob = await this.cronJobRepository.lockJob(type);

        if (!lockedJob) {
          console.warn('Job has already been locked');
          return;
        }

        if ((new Date().getTime() - lockedJob.completedAt?.getTime()) < this.fiveMinutesMs) {
          await this.cronJobRepository.unlockJob(type);
          console.warn('Job has recently been completed');
          return;
        }

        console.info('Job is locked');

        callback();

        await this.cronJobRepository.completeJob(type);
        console.info('Job is completed');
      },
    );
  }

  private async unlockStaleJob(type: CronJobTypeEnum): Promise<void> {
    const staleJob = await this.cronJobRepository.unlockIfTimeExpired(type, this.halfHourMs);

    if (!staleJob) {
      return;
    }

    console.warn('Has stale job: ', JSON.stringify(staleJob));
  }
}

Class 用于与 DB 通信:

import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { CronJobDao, mapCronJobDaoToModel } from '../core/daos/cron-job.dao';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobModel } from '../core/models/cron-job.model';
import { AbstractRepository } from '../core/utils/abstract.repository';

@injectable()
export class CronJobRepository extends AbstractRepository<CronJobDao> {

  constructor(@inject(Db) db: Db) {
    super(db, 'cron_jobs');
  }

  async registerJob(type: CronJobTypeEnum) {
    const result = await this.collection.findOneAndUpdate(
      { type },
      {
        $setOnInsert: {
          type,
          isLocked: false,
          updatedAt: new Date(),
        },
      },
      { upsert: true, returnOriginal: false },
    );

    return result.value;
  }

  async unlockIfTimeExpired(type: CronJobTypeEnum, expiredFromMs: number): Promise<CronJobModel | null> {
    const expirationDate = new Date(new Date().getTime() - expiredFromMs);

    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: true,
        updatedAt: { $lt: expirationDate },
      },
      {
        $set: {
          updatedAt: new Date(),
          isLocked: false,
        },
      });

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }

  async lockJob(type: CronJobTypeEnum) {
    return this.toggleJobLock(type, false);
  }

  async unlockJob(type: CronJobTypeEnum) {
    return this.toggleJobLock(type, true);
  }

  private async toggleJobLock(type: CronJobTypeEnum, stateForToggle: boolean): Promise<CronJobModel | null> {
    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: stateForToggle,
      },
      {
        $set: {
          isLocked: !stateForToggle,
          updatedAt: new Date(),
        },
      },
    );

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }

  async completeJob(type: CronJobTypeEnum): Promise<CronJobModel | null> {
    const currentDate = new Date();

    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: true,
      },
      {
        $set: {
          isLocked: false,
          updatedAt: currentDate,
          completedAt: currentDate,
        },
      },
    );

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }
}