diff --git a/packages/server/src/models/TaskStateModel.ts b/packages/server/src/models/TaskStateModel.ts index 15b3b1de6..e01e88cf8 100644 --- a/packages/server/src/models/TaskStateModel.ts +++ b/packages/server/src/models/TaskStateModel.ts @@ -1,4 +1,5 @@ import { TaskId, TaskState } from '../services/database/types'; +import { ErrorBadRequest, ErrorCode } from '../utils/errors'; import BaseModel from './BaseModel'; export default class TaskStateModel extends BaseModel { @@ -32,13 +33,13 @@ export default class TaskStateModel extends BaseModel { public async start(taskId: TaskId) { const state = await this.loadByTaskId(taskId); - if (state.running) throw new Error(`Task is already running: ${taskId}`); + if (state.running) throw new ErrorBadRequest(`Task is already running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning }); await this.save({ id: state.id, running: 1 }); } public async stop(taskId: TaskId) { const state = await this.loadByTaskId(taskId); - if (!state.running) throw new Error(`Task is not running: ${taskId}`); + if (!state.running) throw new ErrorBadRequest(`Task is not running: ${taskId}`, { code: ErrorCode.TaskAlreadyRunning }); await this.save({ id: state.id, running: 0 }); } diff --git a/packages/server/src/services/TaskService.test.ts b/packages/server/src/services/TaskService.test.ts index 70e5bab37..7c48b26b6 100644 --- a/packages/server/src/services/TaskService.test.ts +++ b/packages/server/src/services/TaskService.test.ts @@ -1,5 +1,6 @@ import config from '../config'; import { Models } from '../models/factory'; +import { ErrorCode } from '../utils/errors'; import { afterAllTests, beforeAllDb, beforeEachDb, expectThrow, models } from '../utils/testing/testUtils'; import { Env } from '../utils/types'; import { TaskId } from './database/types'; @@ -14,6 +15,23 @@ const newService = () => { }); }; +const createDemoTasks = (): Task[] => { + return [ + { + id: TaskId.DeleteExpiredTokens, + description: '', + run: (_models: Models) => {}, + schedule: '', + }, + { + id: TaskId.CompressOldChanges, + description: '', + run: (_models: Models) => {}, + schedule: '', + }, + ]; +}; + describe('TaskService', () => { beforeAll(async () => { @@ -31,98 +49,66 @@ describe('TaskService', () => { test('should register a task', async () => { const service = newService(); - const task: Task = { - id: TaskId.DeleteExpiredTokens, - description: '', - run: (_models: Models) => {}, - schedule: '', - }; - - await service.registerTask(task); + const tasks = createDemoTasks(); + await service.registerTasks(tasks); expect(service.tasks[TaskId.DeleteExpiredTokens]).toBeTruthy(); - await expectThrow(async () => service.registerTask(task)); + expect(service.tasks[TaskId.CompressOldChanges]).toBeTruthy(); + await expectThrow(async () => service.registerTask(tasks[0])); }); - // test('should run a task', async function() { - // const service = newService(); - - // let taskStarted = false; - // let waitToFinish = true; - // let finishTask = false; - // let taskHasRan = false; - - // const taskId = TaskId.DeleteExpiredTokens; - - // const task: Task = { - // id: taskId, - // description: '', - // run: async (_models: Models) => { - // taskStarted = true; - - // const iid = setInterval(() => { - // if (waitToFinish) return; - - // if (finishTask) { - // clearInterval(iid); - // taskHasRan = true; - // } - // }, 1); - // }, - // schedule: '', - // }; - - // await service.registerTask(task); - - // expect((await service.taskState(taskId)).running).toBe(0); - - // const startTime = new Date(); - - // void service.runTask(taskId, RunType.Manual); - // while (!taskStarted) { - // await msleep(1); - // } - - // expect((await service.taskState(taskId)).running).toBe(1); - // waitToFinish = false; - - // while (!taskHasRan) { - // await msleep(1); - // finishTask = true; - // } - - // expect((await service.taskState(taskId)).running).toBe(0); - - // const events = await service.taskLastEvents(taskId); - // expect(events.taskStarted.created_time).toBeGreaterThanOrEqual(startTime.getTime()); - // expect(events.taskCompleted.created_time).toBeGreaterThan(startTime.getTime()); - // }); - test('should not run if task is disabled', async () => { const service = newService(); let taskHasRan = false; - const taskId = TaskId.DeleteExpiredTokens; - - const task: Task = { - id: taskId, - description: '', - run: async (_models: Models) => { - taskHasRan = true; - }, - schedule: '', - }; - - await service.registerTask(task); + const tasks = createDemoTasks(); + tasks[0].run = async (_models: Models) => { + taskHasRan = true; + }, + await service.registerTasks(tasks); + const taskId = tasks[0].id; await service.runTask(taskId, RunType.Manual); expect(taskHasRan).toBe(true); taskHasRan = false; - await models().taskState().disable(task.id); + await models().taskState().disable(taskId); await service.runTask(taskId, RunType.Manual); expect(taskHasRan).toBe(false); }); + test('should not run if task is already running', async () => { + const service = newService(); + + const tasks = createDemoTasks(); + await service.registerTasks(tasks); + const task = tasks[0]; + + const state = await models().taskState().loadByTaskId(task.id); + await models().taskState().save({ id: state.id, running: 1 }); + + await expectThrow(async () => service.runTask(task.id, RunType.Manual), ErrorCode.TaskAlreadyRunning); + }); + + test('should reset interrupted tasks', async () => { + const service = newService(); + + const tasks = createDemoTasks(); + await service.registerTasks(tasks); + const task = tasks[0]; + + const state = await models().taskState().loadByTaskId(task.id); + await models().taskState().save({ id: state.id, running: 1 }); + + const stateBefore = await models().taskState().loadByTaskId(task.id); + + await service.resetInterruptedTasks(); + + const stateAfter = await models().taskState().loadByTaskId(task.id); + + expect(stateBefore.running).toBe(1); + expect(stateAfter.running).toBe(0); + }); + }); diff --git a/packages/server/src/services/TaskService.ts b/packages/server/src/services/TaskService.ts index 69216ab0f..34a9985c6 100644 --- a/packages/server/src/services/TaskService.ts +++ b/packages/server/src/services/TaskService.ts @@ -5,7 +5,7 @@ import BaseService from './BaseService'; import { Event, EventType, TaskId, TaskState } from './database/types'; import { Services } from './types'; import { _ } from '@joplin/lib/locale'; -import { ErrorNotFound } from '../utils/errors'; +import { ErrorCode, ErrorNotFound } from '../utils/errors'; import { durationToMilliseconds } from '../utils/time'; const cron = require('node-cron'); @@ -104,6 +104,16 @@ export default class TaskService extends BaseService { }; } + public async resetInterruptedTasks() { + const taskStates = await this.models.taskState().all(); + for (const taskState of taskStates) { + if (taskState.running) { + logger.warn(`Found a task that was in running state: ${this.taskDisplayString(taskState.task_id)} - resetting it.`); + await this.models.taskState().stop(taskState.task_id); + } + } + } + private taskById(id: TaskId): Task { if (!this.tasks_[id]) throw new Error(`No such task: ${id}`); return this.tasks_[id]; @@ -159,13 +169,28 @@ export default class TaskService extends BaseService { interval = null; } + const runTaskWithErrorChecking = async (taskId: TaskId) => { + try { + await this.runTask(taskId, RunType.Scheduled); + } catch (error) { + if (error.code === ErrorCode.TaskAlreadyRunning) { + // This is not critical but we should log a warning + // because it may mean that the interval is too tight, + // or the task is taking too long. + logger.warn(`Tried to start ${this.taskDisplayString(taskId)} but it was already running`); + } else { + logger.error(`Failed running task ${this.taskDisplayString(taskId)}`, error); + } + } + }; + if (interval !== null) { setInterval(async () => { - await this.runTask(Number(taskId), RunType.Scheduled); + await runTaskWithErrorChecking(Number(taskId)); }, interval); } else { cron.schedule(task.schedule, async () => { - await this.runTask(Number(taskId), RunType.Scheduled); + await runTaskWithErrorChecking(Number(taskId)); }); } } diff --git a/packages/server/src/utils/errors.ts b/packages/server/src/utils/errors.ts index ec339fb5a..9f41e32e4 100644 --- a/packages/server/src/utils/errors.ts +++ b/packages/server/src/utils/errors.ts @@ -6,6 +6,7 @@ export enum ErrorCode { NoSub = 'no_sub', NoStripeSub = 'no_stripe_sub', InvalidOrigin = 'invalidOrigin', + TaskAlreadyRunning = 'taskAlreadyRunning', } export interface ErrorOptions { diff --git a/packages/server/src/utils/setupTaskService.ts b/packages/server/src/utils/setupTaskService.ts index ee6360c44..d8864b359 100644 --- a/packages/server/src/utils/setupTaskService.ts +++ b/packages/server/src/utils/setupTaskService.ts @@ -104,5 +104,7 @@ export default async function(env: Env, models: Models, config: Config, services await taskService.registerTasks(tasks); + await taskService.resetInterruptedTasks(); + return taskService; }