如何使用nodejs锁定mongodb中的阅读
how to lock reading in mongodb with nodejs
史前
我有一个 nodejs 服务器,它的实例在多台机器上 运行 并且每个实例每天同时运行一次 cron 作业。
当一个实例 运行 或刚刚完成其作业时,另一个实例应跳过作业内部的执行逻辑。
我已经有 mongodb 连接,所以我决定将运行作业的状态及其时间保存到数据库,并 check/change 在每个作业的回调中保存它。
我选择在集合中保存作业状态的文档模型是:
interface JobDao {
_id: ObjectId;
type: string;
state: 'locked' | 'failed' | 'completed';
updatedDate: DateISO;
}
我使用包 "mongodb": "^3.6.3"
进行查询。
经过一些尝试,我想知道我是否可以实现下面描述的行为。也许有人可以建议另一种解决方案来同步多台机器的 运行 作业。
所以我尝试实施解决方案并寻求帮助:
- 当 cron 启动时,从 DB 获取作业。
- 检查具有此类条件的作业状态:
- 如果它被锁定且未过期 -> 跳过逻辑 (注意:我使用一小时的过期时间来防止服务器在 运行 时出现意外问题)
- 如果它被锁定并过期 -> 将作业状态更改为锁定
- 如果它没有被锁定但更新到最后 5 分钟 -> 将作业状态更改为锁定
- 根据上述条件执行逻辑
- “解锁”作业(更新文档中的作业状态)
但这就是我遇到的问题。因为没有并发。在一台机器上获取和更新文档之间,另一台机器可以获取或更新不相关数据的陈旧文档。
我尝试过以下解决方案:
- findOneAndUpdate
- 试图添加聚合(这里是比较exipration的问题,看起来是不可能的)。
- 交易
- 批量
但没有任何效果。我开始考虑更改数据库,但也许有人可以说如何使用 mongodb 来实现它,或者可以推荐一些更合适的解决方案?
我建议您使用某种锁定机制在多个服务之间进行同步。
如果您希望关键部分中只有一项服务 write/read ,则可以使用基本互斥锁。
我不确定你到底想要什么,以防一个服务试图读取而另一个服务执行一些更改(等待、跳过或其他)。
您可以使用一些共享组件,例如redis来存储锁定密钥。
稍作休息后,我决定从头开始,终于找到了解决办法。
这是我的代码示例。它并不完美,所以我打算重构它,但它可以解决我的问题!希望对某人有所帮助。
略述其逻辑
“中介”是 public 方法 scheduleJob
。逻辑顺序:
- 当我们安排作业时,它会为数据库中的类型创建新文档(如果它不存在)。
- 解锁过时的作业(如果已锁定超过半小时,则过时)。 服务器可能会在 运行 作业时崩溃,这会导致无限锁定作业,但检查过时的作业应该会有所帮助
- 下一步是锁定未锁定的作业,否则,完成逻辑。 一个实例在下一个实例开始之前完成作业是可能的,所以如果同一作业在最后 5 分钟内 运行,我添加了作业的完成。重要的是这种条件限制了频率,因为作业不能每 5 分钟运行一次,但在我的情况下它是合适的解决方案
CronJobDao
和CronJobModel
是一样的,表示DB中的文档:
export interface CronJobDao {
type: CronJobTypeEnum;
isLocked: boolean;
updatedAt: Date;
completedAt: Date;
}
使用 scheduleJob
方法的服务:
import { inject, injectable } from 'inversify';
import { Job, scheduleJob } from 'node-schedule';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobRepository } from './cron-job.repository';
@injectable()
export class CronJobService {
readonly halfHourMs = 30 * 60 * 1000;
readonly fiveMinutesMs = 5 * 60 * 1000;
constructor(
@inject(CronJobRepository) private cronJobRepository: CronJobRepository,
) {}
scheduleJob(type: CronJobTypeEnum, timeRule: string, callback: Function): Job {
this.cronJobRepository.registerJob(type).then();
return scheduleJob(
type,
timeRule,
async () => {
await this.unlockStaleJob(type);
const lockedJob = await this.cronJobRepository.lockJob(type);
if (!lockedJob) {
console.warn('Job has already been locked');
return;
}
if ((new Date().getTime() - lockedJob.completedAt?.getTime()) < this.fiveMinutesMs) {
await this.cronJobRepository.unlockJob(type);
console.warn('Job has recently been completed');
return;
}
console.info('Job is locked');
callback();
await this.cronJobRepository.completeJob(type);
console.info('Job is completed');
},
);
}
private async unlockStaleJob(type: CronJobTypeEnum): Promise<void> {
const staleJob = await this.cronJobRepository.unlockIfTimeExpired(type, this.halfHourMs);
if (!staleJob) {
return;
}
console.warn('Has stale job: ', JSON.stringify(staleJob));
}
}
Class 用于与 DB 通信:
import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { CronJobDao, mapCronJobDaoToModel } from '../core/daos/cron-job.dao';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobModel } from '../core/models/cron-job.model';
import { AbstractRepository } from '../core/utils/abstract.repository';
@injectable()
export class CronJobRepository extends AbstractRepository<CronJobDao> {
constructor(@inject(Db) db: Db) {
super(db, 'cron_jobs');
}
async registerJob(type: CronJobTypeEnum) {
const result = await this.collection.findOneAndUpdate(
{ type },
{
$setOnInsert: {
type,
isLocked: false,
updatedAt: new Date(),
},
},
{ upsert: true, returnOriginal: false },
);
return result.value;
}
async unlockIfTimeExpired(type: CronJobTypeEnum, expiredFromMs: number): Promise<CronJobModel | null> {
const expirationDate = new Date(new Date().getTime() - expiredFromMs);
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
updatedAt: { $lt: expirationDate },
},
{
$set: {
updatedAt: new Date(),
isLocked: false,
},
});
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async lockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, false);
}
async unlockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, true);
}
private async toggleJobLock(type: CronJobTypeEnum, stateForToggle: boolean): Promise<CronJobModel | null> {
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: stateForToggle,
},
{
$set: {
isLocked: !stateForToggle,
updatedAt: new Date(),
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async completeJob(type: CronJobTypeEnum): Promise<CronJobModel | null> {
const currentDate = new Date();
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
},
{
$set: {
isLocked: false,
updatedAt: currentDate,
completedAt: currentDate,
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
}
史前
我有一个 nodejs 服务器,它的实例在多台机器上 运行 并且每个实例每天同时运行一次 cron 作业。
当一个实例 运行 或刚刚完成其作业时,另一个实例应跳过作业内部的执行逻辑。
我已经有 mongodb 连接,所以我决定将运行作业的状态及其时间保存到数据库,并 check/change 在每个作业的回调中保存它。 我选择在集合中保存作业状态的文档模型是:
interface JobDao {
_id: ObjectId;
type: string;
state: 'locked' | 'failed' | 'completed';
updatedDate: DateISO;
}
我使用包 "mongodb": "^3.6.3"
进行查询。
经过一些尝试,我想知道我是否可以实现下面描述的行为。也许有人可以建议另一种解决方案来同步多台机器的 运行 作业。
所以我尝试实施解决方案并寻求帮助:
- 当 cron 启动时,从 DB 获取作业。
- 检查具有此类条件的作业状态:
- 如果它被锁定且未过期 -> 跳过逻辑 (注意:我使用一小时的过期时间来防止服务器在 运行 时出现意外问题)
- 如果它被锁定并过期 -> 将作业状态更改为锁定
- 如果它没有被锁定但更新到最后 5 分钟 -> 将作业状态更改为锁定
- 根据上述条件执行逻辑
- “解锁”作业(更新文档中的作业状态)
但这就是我遇到的问题。因为没有并发。在一台机器上获取和更新文档之间,另一台机器可以获取或更新不相关数据的陈旧文档。
我尝试过以下解决方案:
- findOneAndUpdate
- 试图添加聚合(这里是比较exipration的问题,看起来是不可能的)。
- 交易
- 批量
但没有任何效果。我开始考虑更改数据库,但也许有人可以说如何使用 mongodb 来实现它,或者可以推荐一些更合适的解决方案?
我建议您使用某种锁定机制在多个服务之间进行同步。 如果您希望关键部分中只有一项服务 write/read ,则可以使用基本互斥锁。 我不确定你到底想要什么,以防一个服务试图读取而另一个服务执行一些更改(等待、跳过或其他)。 您可以使用一些共享组件,例如redis来存储锁定密钥。
稍作休息后,我决定从头开始,终于找到了解决办法。 这是我的代码示例。它并不完美,所以我打算重构它,但它可以解决我的问题!希望对某人有所帮助。
略述其逻辑
“中介”是 public 方法 scheduleJob
。逻辑顺序:
- 当我们安排作业时,它会为数据库中的类型创建新文档(如果它不存在)。
- 解锁过时的作业(如果已锁定超过半小时,则过时)。 服务器可能会在 运行 作业时崩溃,这会导致无限锁定作业,但检查过时的作业应该会有所帮助
- 下一步是锁定未锁定的作业,否则,完成逻辑。 一个实例在下一个实例开始之前完成作业是可能的,所以如果同一作业在最后 5 分钟内 运行,我添加了作业的完成。重要的是这种条件限制了频率,因为作业不能每 5 分钟运行一次,但在我的情况下它是合适的解决方案
CronJobDao
和CronJobModel
是一样的,表示DB中的文档:
export interface CronJobDao {
type: CronJobTypeEnum;
isLocked: boolean;
updatedAt: Date;
completedAt: Date;
}
使用 scheduleJob
方法的服务:
import { inject, injectable } from 'inversify';
import { Job, scheduleJob } from 'node-schedule';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobRepository } from './cron-job.repository';
@injectable()
export class CronJobService {
readonly halfHourMs = 30 * 60 * 1000;
readonly fiveMinutesMs = 5 * 60 * 1000;
constructor(
@inject(CronJobRepository) private cronJobRepository: CronJobRepository,
) {}
scheduleJob(type: CronJobTypeEnum, timeRule: string, callback: Function): Job {
this.cronJobRepository.registerJob(type).then();
return scheduleJob(
type,
timeRule,
async () => {
await this.unlockStaleJob(type);
const lockedJob = await this.cronJobRepository.lockJob(type);
if (!lockedJob) {
console.warn('Job has already been locked');
return;
}
if ((new Date().getTime() - lockedJob.completedAt?.getTime()) < this.fiveMinutesMs) {
await this.cronJobRepository.unlockJob(type);
console.warn('Job has recently been completed');
return;
}
console.info('Job is locked');
callback();
await this.cronJobRepository.completeJob(type);
console.info('Job is completed');
},
);
}
private async unlockStaleJob(type: CronJobTypeEnum): Promise<void> {
const staleJob = await this.cronJobRepository.unlockIfTimeExpired(type, this.halfHourMs);
if (!staleJob) {
return;
}
console.warn('Has stale job: ', JSON.stringify(staleJob));
}
}
Class 用于与 DB 通信:
import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { CronJobDao, mapCronJobDaoToModel } from '../core/daos/cron-job.dao';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobModel } from '../core/models/cron-job.model';
import { AbstractRepository } from '../core/utils/abstract.repository';
@injectable()
export class CronJobRepository extends AbstractRepository<CronJobDao> {
constructor(@inject(Db) db: Db) {
super(db, 'cron_jobs');
}
async registerJob(type: CronJobTypeEnum) {
const result = await this.collection.findOneAndUpdate(
{ type },
{
$setOnInsert: {
type,
isLocked: false,
updatedAt: new Date(),
},
},
{ upsert: true, returnOriginal: false },
);
return result.value;
}
async unlockIfTimeExpired(type: CronJobTypeEnum, expiredFromMs: number): Promise<CronJobModel | null> {
const expirationDate = new Date(new Date().getTime() - expiredFromMs);
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
updatedAt: { $lt: expirationDate },
},
{
$set: {
updatedAt: new Date(),
isLocked: false,
},
});
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async lockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, false);
}
async unlockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, true);
}
private async toggleJobLock(type: CronJobTypeEnum, stateForToggle: boolean): Promise<CronJobModel | null> {
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: stateForToggle,
},
{
$set: {
isLocked: !stateForToggle,
updatedAt: new Date(),
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async completeJob(type: CronJobTypeEnum): Promise<CronJobModel | null> {
const currentDate = new Date();
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
},
{
$set: {
isLocked: false,
updatedAt: currentDate,
completedAt: currentDate,
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
}