1
0
mirror of https://github.com/laurent22/joplin.git synced 2025-01-02 12:47:41 +02:00

Server: Ensure that server does not crash when trying to start a task that is already running

This commit is contained in:
Laurent Cozic 2023-08-16 14:57:51 +01:00
parent d0e943630d
commit 5ed3d94faa
5 changed files with 96 additions and 81 deletions

View File

@ -1,4 +1,5 @@
import { TaskId, TaskState } from '../services/database/types';
import { ErrorBadRequest, ErrorCode } from '../utils/errors';
import BaseModel from './BaseModel';
export default class TaskStateModel extends BaseModel<TaskState> {
@ -32,13 +33,13 @@ export default class TaskStateModel extends BaseModel<TaskState> {
public async start(taskId: TaskId) {
const state = await this.loadByTaskId(taskId);
if (state.running) throw new Error(`Task is already running: ${taskId}`);
if (state.running) throw new ErrorBadRequest(`Task is already running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning });
await this.save({ id: state.id, running: 1 });
}
public async stop(taskId: TaskId) {
const state = await this.loadByTaskId(taskId);
if (!state.running) throw new Error(`Task is not running: ${taskId}`);
if (!state.running) throw new ErrorBadRequest(`Task is not running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning });
await this.save({ id: state.id, running: 0 });
}

View File

@ -1,5 +1,6 @@
import config from '../config';
import { Models } from '../models/factory';
import { ErrorCode } from '../utils/errors';
import { afterAllTests, beforeAllDb, beforeEachDb, expectThrow, models } from '../utils/testing/testUtils';
import { Env } from '../utils/types';
import { TaskId } from './database/types';
@ -14,6 +15,23 @@ const newService = () => {
});
};
const createDemoTasks = (): Task[] => {
return [
{
id: TaskId.DeleteExpiredTokens,
description: '',
run: (_models: Models) => {},
schedule: '',
},
{
id: TaskId.CompressOldChanges,
description: '',
run: (_models: Models) => {},
schedule: '',
},
];
};
describe('TaskService', () => {
beforeAll(async () => {
@ -31,98 +49,66 @@ describe('TaskService', () => {
test('should register a task', async () => {
const service = newService();
const task: Task = {
id: TaskId.DeleteExpiredTokens,
description: '',
run: (_models: Models) => {},
schedule: '',
};
await service.registerTask(task);
const tasks = createDemoTasks();
await service.registerTasks(tasks);
expect(service.tasks[TaskId.DeleteExpiredTokens]).toBeTruthy();
await expectThrow(async () => service.registerTask(task));
expect(service.tasks[TaskId.CompressOldChanges]).toBeTruthy();
await expectThrow(async () => service.registerTask(tasks[0]));
});
// test('should run a task', async function() {
// const service = newService();
// let taskStarted = false;
// let waitToFinish = true;
// let finishTask = false;
// let taskHasRan = false;
// const taskId = TaskId.DeleteExpiredTokens;
// const task: Task = {
// id: taskId,
// description: '',
// run: async (_models: Models) => {
// taskStarted = true;
// const iid = setInterval(() => {
// if (waitToFinish) return;
// if (finishTask) {
// clearInterval(iid);
// taskHasRan = true;
// }
// }, 1);
// },
// schedule: '',
// };
// await service.registerTask(task);
// expect((await service.taskState(taskId)).running).toBe(0);
// const startTime = new Date();
// void service.runTask(taskId, RunType.Manual);
// while (!taskStarted) {
// await msleep(1);
// }
// expect((await service.taskState(taskId)).running).toBe(1);
// waitToFinish = false;
// while (!taskHasRan) {
// await msleep(1);
// finishTask = true;
// }
// expect((await service.taskState(taskId)).running).toBe(0);
// const events = await service.taskLastEvents(taskId);
// expect(events.taskStarted.created_time).toBeGreaterThanOrEqual(startTime.getTime());
// expect(events.taskCompleted.created_time).toBeGreaterThan(startTime.getTime());
// });
test('should not run if task is disabled', async () => {
const service = newService();
let taskHasRan = false;
const taskId = TaskId.DeleteExpiredTokens;
const task: Task = {
id: taskId,
description: '',
run: async (_models: Models) => {
taskHasRan = true;
},
schedule: '',
};
await service.registerTask(task);
const tasks = createDemoTasks();
tasks[0].run = async (_models: Models) => {
taskHasRan = true;
},
await service.registerTasks(tasks);
const taskId = tasks[0].id;
await service.runTask(taskId, RunType.Manual);
expect(taskHasRan).toBe(true);
taskHasRan = false;
await models().taskState().disable(task.id);
await models().taskState().disable(taskId);
await service.runTask(taskId, RunType.Manual);
expect(taskHasRan).toBe(false);
});
test('should not run if task is already running', async () => {
const service = newService();
const tasks = createDemoTasks();
await service.registerTasks(tasks);
const task = tasks[0];
const state = await models().taskState().loadByTaskId(task.id);
await models().taskState().save({ id: state.id, running: 1 });
await expectThrow(async () => service.runTask(task.id, RunType.Manual), ErrorCode.TaskAlreadyRunning);
});
test('should reset interrupted tasks', async () => {
const service = newService();
const tasks = createDemoTasks();
await service.registerTasks(tasks);
const task = tasks[0];
const state = await models().taskState().loadByTaskId(task.id);
await models().taskState().save({ id: state.id, running: 1 });
const stateBefore = await models().taskState().loadByTaskId(task.id);
await service.resetInterruptedTasks();
const stateAfter = await models().taskState().loadByTaskId(task.id);
expect(stateBefore.running).toBe(1);
expect(stateAfter.running).toBe(0);
});
});

View File

@ -5,7 +5,7 @@ import BaseService from './BaseService';
import { Event, EventType, TaskId, TaskState } from './database/types';
import { Services } from './types';
import { _ } from '@joplin/lib/locale';
import { ErrorNotFound } from '../utils/errors';
import { ErrorCode, ErrorNotFound } from '../utils/errors';
import { durationToMilliseconds } from '../utils/time';
const cron = require('node-cron');
@ -104,6 +104,16 @@ export default class TaskService extends BaseService {
};
}
public async resetInterruptedTasks() {
const taskStates = await this.models.taskState().all();
for (const taskState of taskStates) {
if (taskState.running) {
logger.warn(`Found a task that was in running state: ${this.taskDisplayString(taskState.task_id)} - resetting it.`);
await this.models.taskState().stop(taskState.task_id);
}
}
}
private taskById(id: TaskId): Task {
if (!this.tasks_[id]) throw new Error(`No such task: ${id}`);
return this.tasks_[id];
@ -159,13 +169,28 @@ export default class TaskService extends BaseService {
interval = null;
}
const runTaskWithErrorChecking = async (taskId: TaskId) => {
try {
await this.runTask(taskId, RunType.Scheduled);
} catch (error) {
if (error.code === ErrorCode.TaskAlreadyRunning) {
// This is not critical but we should log a warning
// because it may mean that the interval is too tight,
// or the task is taking too long.
logger.warn(`Tried to start ${this.taskDisplayString(taskId)} but it was already running`);
} else {
logger.error(`Failed running task ${this.taskDisplayString(taskId)}`, error);
}
}
};
if (interval !== null) {
setInterval(async () => {
await this.runTask(Number(taskId), RunType.Scheduled);
await runTaskWithErrorChecking(Number(taskId));
}, interval);
} else {
cron.schedule(task.schedule, async () => {
await this.runTask(Number(taskId), RunType.Scheduled);
await runTaskWithErrorChecking(Number(taskId));
});
}
}

View File

@ -6,6 +6,7 @@ export enum ErrorCode {
NoSub = 'no_sub',
NoStripeSub = 'no_stripe_sub',
InvalidOrigin = 'invalidOrigin',
TaskAlreadyRunning = 'taskAlreadyRunning',
}
export interface ErrorOptions {

View File

@ -104,5 +104,7 @@ export default async function(env: Env, models: Models, config: Config, services
await taskService.registerTasks(tasks);
await taskService.resetInterruptedTasks();
return taskService;
}