mirror of
https://github.com/laurent22/joplin.git
synced 2024-11-24 08:12:24 +02:00
Server: Add support for events and use them to track background tasks
This commit is contained in:
parent
ba17742895
commit
79d1ad706a
@ -47,7 +47,7 @@
|
||||
"updatePluginTypes": "./packages/generator-joplin/updateTypes.sh",
|
||||
"watch": "lerna run watch --stream --parallel",
|
||||
"watchWebsite": "nodemon --verbose --watch Assets/WebsiteAssets --watch packages/tools/website/build.js --ext md,ts,js,mustache,css,tsx,gif,png,svg --exec \"node packages/tools/website/build.js && http-server --port 8077 docs -a localhost\"",
|
||||
"i": "lerna add --no-bootstrap --scope"
|
||||
"i": "node packages/tools/lernaInstall"
|
||||
},
|
||||
"husky": {
|
||||
"hooks": {
|
||||
|
45127
packages/server/package-lock.json
generated
45127
packages/server/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@ -22,6 +22,7 @@
|
||||
"@joplin/lib": "~2.4",
|
||||
"@joplin/renderer": "~2.4",
|
||||
"@koa/cors": "^3.1.0",
|
||||
"@types/uuid": "^8.3.1",
|
||||
"bcryptjs": "^2.4.3",
|
||||
"bulma": "^0.9.1",
|
||||
"bulma-prefers-dark": "^0.1.0-beta.0",
|
||||
@ -47,6 +48,7 @@
|
||||
"raw-body": "^2.4.1",
|
||||
"sqlite3": "^4.1.0",
|
||||
"stripe": "^8.150.0",
|
||||
"uuid": "^8.3.2",
|
||||
"yargs": "^14.0.0",
|
||||
"zxcvbn": "^4.4.2"
|
||||
},
|
||||
|
Binary file not shown.
21
packages/server/src/migrations/20210927183928_events.ts
Normal file
21
packages/server/src/migrations/20210927183928_events.ts
Normal file
@ -0,0 +1,21 @@
|
||||
import { Knex } from 'knex';
|
||||
import { DbConnection } from '../db';
|
||||
|
||||
export async function up(db: DbConnection): Promise<any> {
|
||||
await db.schema.createTable('events', (table: Knex.CreateTableBuilder) => {
|
||||
table.uuid('id').unique().notNullable();
|
||||
table.increments('counter').unique().primary().notNullable();
|
||||
table.integer('type').notNullable();
|
||||
table.string('name', 32).defaultTo('').notNullable();
|
||||
table.bigInteger('created_time').notNullable();
|
||||
});
|
||||
|
||||
await db.schema.alterTable('events', (table: Knex.CreateTableBuilder) => {
|
||||
table.index('type');
|
||||
table.index('name');
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(db: DbConnection): Promise<any> {
|
||||
await db.schema.dropTable('events');
|
||||
}
|
@ -8,11 +8,17 @@ import * as EventEmitter from 'events';
|
||||
import { Config } from '../utils/types';
|
||||
import personalizedUserContentBaseUrl from '@joplin/lib/services/joplinServer/personalizedUserContentBaseUrl';
|
||||
import Logger from '@joplin/lib/Logger';
|
||||
import dbuuid from '../utils/dbuuid';
|
||||
|
||||
const logger = Logger.create('BaseModel');
|
||||
|
||||
type SavePoint = string;
|
||||
|
||||
export enum UuidType {
|
||||
NanoId = 1,
|
||||
Native = 2,
|
||||
}
|
||||
|
||||
export interface SaveOptions {
|
||||
isNew?: boolean;
|
||||
skipValidation?: boolean;
|
||||
@ -139,6 +145,10 @@ export default abstract class BaseModel<T> {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected uuidType(): UuidType {
|
||||
return UuidType.NanoId;
|
||||
}
|
||||
|
||||
protected autoTimestampEnabled(): boolean {
|
||||
return true;
|
||||
}
|
||||
@ -257,13 +267,12 @@ export default abstract class BaseModel<T> {
|
||||
|
||||
public async save(object: T, options: SaveOptions = {}): Promise<T> {
|
||||
if (!object) throw new Error('Object cannot be empty');
|
||||
|
||||
const toSave = Object.assign({}, object);
|
||||
|
||||
const isNew = await this.isNew(object, options);
|
||||
|
||||
if (this.hasUuid() && isNew && !(toSave as WithUuid).id) {
|
||||
(toSave as WithUuid).id = uuidgen();
|
||||
(toSave as WithUuid).id = this.uuidType() === UuidType.NanoId ? uuidgen() : dbuuid();
|
||||
}
|
||||
|
||||
if (this.autoTimestampEnabled()) {
|
||||
|
40
packages/server/src/models/EventModel.test.ts
Normal file
40
packages/server/src/models/EventModel.test.ts
Normal file
@ -0,0 +1,40 @@
|
||||
import { EventType } from '../services/database/types';
|
||||
import { beforeAllDb, afterAllTests, beforeEachDb, models } from '../utils/testing/testUtils';
|
||||
import { msleep } from '../utils/time';
|
||||
|
||||
describe('EventModel', function() {
|
||||
|
||||
beforeAll(async () => {
|
||||
await beforeAllDb('EventModel');
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await afterAllTests();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await beforeEachDb();
|
||||
});
|
||||
|
||||
test('should create an event', async function() {
|
||||
await models().event().create(EventType.TaskStarted, 'deleteExpiredTokens');
|
||||
|
||||
const events = await models().event().all();
|
||||
expect(events.length).toBe(1);
|
||||
expect(events[0].type).toBe(EventType.TaskStarted);
|
||||
expect(events[0].name).toBe('deleteExpiredTokens');
|
||||
});
|
||||
|
||||
test('should get the latest event', async function() {
|
||||
await models().event().create(EventType.TaskStarted, 'deleteExpiredTokens');
|
||||
await msleep(1);
|
||||
await models().event().create(EventType.TaskStarted, 'deleteExpiredTokens');
|
||||
|
||||
const allEvents = (await models().event().all()).sort((a, b) => a.created_time < b.created_time ? -1 : +1);
|
||||
expect(allEvents[0].created_time).toBeLessThan(allEvents[1].created_time);
|
||||
|
||||
const latest = await models().event().lastEventByTypeAndName(EventType.TaskStarted, 'deleteExpiredTokens');
|
||||
expect(latest.id).toBe(allEvents[1].id);
|
||||
});
|
||||
|
||||
});
|
36
packages/server/src/models/EventModel.ts
Normal file
36
packages/server/src/models/EventModel.ts
Normal file
@ -0,0 +1,36 @@
|
||||
import { Event, EventType } from '../services/database/types';
|
||||
import BaseModel, { UuidType } from './BaseModel';
|
||||
|
||||
|
||||
export default class EventModel extends BaseModel<Event> {
|
||||
|
||||
public get tableName(): string {
|
||||
return 'events';
|
||||
}
|
||||
|
||||
protected autoTimestampEnabled(): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected uuidType(): UuidType {
|
||||
return UuidType.Native;
|
||||
}
|
||||
|
||||
public async create(type: EventType, name: string = '') {
|
||||
await this.save({
|
||||
name,
|
||||
type,
|
||||
created_time: Date.now(),
|
||||
});
|
||||
}
|
||||
|
||||
public async lastEventByTypeAndName(type: EventType, name: string): Promise<Event | null> {
|
||||
return this
|
||||
.db(this.tableName)
|
||||
.where('type', '=', type)
|
||||
.where('name', '=', name)
|
||||
.orderBy('counter', 'desc')
|
||||
.first();
|
||||
}
|
||||
|
||||
}
|
@ -70,6 +70,7 @@ import KeyValueModel from './KeyValueModel';
|
||||
import TokenModel from './TokenModel';
|
||||
import SubscriptionModel from './SubscriptionModel';
|
||||
import UserFlagModel from './UserFlagModel';
|
||||
import EventModel from './EventModel';
|
||||
import { Config } from '../utils/types';
|
||||
|
||||
export class Models {
|
||||
@ -142,6 +143,10 @@ export class Models {
|
||||
return new UserFlagModel(this.db_, newModelFactory, this.config_);
|
||||
}
|
||||
|
||||
public event() {
|
||||
return new EventModel(this.db_, newModelFactory, this.config_);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
export default function newModelFactory(db: DbConnection, config: Config): Models {
|
||||
|
@ -27,7 +27,7 @@ router.post('tasks', async (_path: SubPath, ctx: AppContext) => {
|
||||
|
||||
for (const k of Object.keys(fields)) {
|
||||
if (k.startsWith('checkbox_')) {
|
||||
const taskId = k.substr(9);
|
||||
const taskId = Number(k.substr(9));
|
||||
try {
|
||||
void taskService.runTask(taskId, RunType.Manual);
|
||||
} catch (error) {
|
||||
@ -58,8 +58,10 @@ router.get('tasks', async (_path: SubPath, ctx: AppContext) => {
|
||||
const taskService = ctx.joplin.services.tasks;
|
||||
|
||||
const taskRows: Row[] = [];
|
||||
for (const [taskId, task] of Object.entries(taskService.tasks)) {
|
||||
for (const [taskIdString, task] of Object.entries(taskService.tasks)) {
|
||||
const taskId = Number(taskIdString);
|
||||
const state = taskService.taskState(taskId);
|
||||
const events = await taskService.taskLastEvents(taskId);
|
||||
|
||||
taskRows.push([
|
||||
{
|
||||
@ -67,7 +69,7 @@ router.get('tasks', async (_path: SubPath, ctx: AppContext) => {
|
||||
checkbox: true,
|
||||
},
|
||||
{
|
||||
value: taskId,
|
||||
value: taskId.toString(),
|
||||
},
|
||||
{
|
||||
value: task.description,
|
||||
@ -79,10 +81,10 @@ router.get('tasks', async (_path: SubPath, ctx: AppContext) => {
|
||||
value: yesOrNo(state.running),
|
||||
},
|
||||
{
|
||||
value: state.lastRunTime ? formatDateTime(state.lastRunTime) : '-',
|
||||
value: events.taskStarted ? formatDateTime(events.taskStarted.created_time) : '-',
|
||||
},
|
||||
{
|
||||
value: state.lastCompletionTime ? formatDateTime(state.lastCompletionTime) : '-',
|
||||
value: events.taskCompleted ? formatDateTime(events.taskCompleted.created_time) : '-',
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ describe('TaskService', function() {
|
||||
const service = newService();
|
||||
|
||||
const task: Task = {
|
||||
id: 'test',
|
||||
id: 123456,
|
||||
description: '',
|
||||
run: (_models: Models) => {},
|
||||
schedule: '',
|
||||
@ -34,7 +34,7 @@ describe('TaskService', function() {
|
||||
|
||||
service.registerTask(task);
|
||||
|
||||
expect(service.tasks['test']).toBeTruthy();
|
||||
expect(service.tasks[123456]).toBeTruthy();
|
||||
await expectThrow(async () => service.registerTask(task));
|
||||
});
|
||||
|
||||
@ -44,8 +44,10 @@ describe('TaskService', function() {
|
||||
let finishTask = false;
|
||||
let taskHasRan = false;
|
||||
|
||||
const taskId = 123456;
|
||||
|
||||
const task: Task = {
|
||||
id: 'test',
|
||||
id: taskId,
|
||||
description: '',
|
||||
run: async (_models: Models) => {
|
||||
const iid = setInterval(() => {
|
||||
@ -60,22 +62,23 @@ describe('TaskService', function() {
|
||||
|
||||
service.registerTask(task);
|
||||
|
||||
expect(service.taskState('test').running).toBe(false);
|
||||
expect(service.taskState(taskId).running).toBe(false);
|
||||
|
||||
const startTime = new Date();
|
||||
|
||||
void service.runTask('test', RunType.Manual);
|
||||
expect(service.taskState('test').running).toBe(true);
|
||||
expect(service.taskState('test').lastCompletionTime).toBeFalsy();
|
||||
expect(service.taskState('test').lastRunTime.getTime()).toBeGreaterThanOrEqual(startTime.getTime());
|
||||
void service.runTask(taskId, RunType.Manual);
|
||||
expect(service.taskState(taskId).running).toBe(true);
|
||||
|
||||
await msleep(10);
|
||||
finishTask = true;
|
||||
await msleep(10);
|
||||
|
||||
expect(taskHasRan).toBe(true);
|
||||
expect(service.taskState('test').running).toBe(false);
|
||||
expect(service.taskState('test').lastCompletionTime.getTime()).toBeGreaterThan(startTime.getTime());
|
||||
expect(service.taskState(taskId).running).toBe(false);
|
||||
|
||||
const events = await service.taskLastEvents(taskId);
|
||||
expect(events.taskStarted.created_time).toBeGreaterThanOrEqual(startTime.getTime());
|
||||
expect(events.taskCompleted.created_time).toBeGreaterThan(startTime.getTime());
|
||||
});
|
||||
|
||||
});
|
||||
|
@ -1,11 +1,18 @@
|
||||
import Logger from '@joplin/lib/Logger';
|
||||
import { Models } from '../models/factory';
|
||||
import BaseService from './BaseService';
|
||||
import { Event, EventType } from './database/types';
|
||||
const cron = require('node-cron');
|
||||
|
||||
const logger = Logger.create('TaskService');
|
||||
|
||||
type TaskId = string;
|
||||
export enum TaskId {
|
||||
DeleteExpiredTokens = 1,
|
||||
UpdateTotalSizes = 2,
|
||||
HandleOversizedAccounts = 3,
|
||||
HandleBetaUserEmails = 4,
|
||||
HandleFailedPaymentSubscriptions = 5,
|
||||
}
|
||||
|
||||
export enum RunType {
|
||||
Scheduled = 1,
|
||||
@ -25,24 +32,25 @@ export interface Task {
|
||||
run(models: Models): void;
|
||||
}
|
||||
|
||||
export type Tasks = Record<TaskId, Task>;
|
||||
export type Tasks = Record<number, Task>;
|
||||
|
||||
interface TaskState {
|
||||
running: boolean;
|
||||
lastRunTime: Date;
|
||||
lastCompletionTime: Date;
|
||||
}
|
||||
|
||||
const defaultTaskState: TaskState = {
|
||||
running: false,
|
||||
lastRunTime: null,
|
||||
lastCompletionTime: null,
|
||||
};
|
||||
|
||||
interface TaskEvents {
|
||||
taskStarted: Event;
|
||||
taskCompleted: Event;
|
||||
}
|
||||
|
||||
export default class TaskService extends BaseService {
|
||||
|
||||
private tasks_: Tasks = {};
|
||||
private taskStates_: Record<TaskId, TaskState> = {};
|
||||
private taskStates_: Record<number, TaskState> = {};
|
||||
|
||||
public registerTask(task: Task) {
|
||||
if (this.tasks_[task.id]) throw new Error(`Already a task with this ID: ${task.id}`);
|
||||
@ -63,6 +71,13 @@ export default class TaskService extends BaseService {
|
||||
return this.taskStates_[id];
|
||||
}
|
||||
|
||||
public async taskLastEvents(id: TaskId): Promise<TaskEvents> {
|
||||
return {
|
||||
taskStarted: await this.models.event().lastEventByTypeAndName(EventType.TaskStarted, id.toString()),
|
||||
taskCompleted: await this.models.event().lastEventByTypeAndName(EventType.TaskCompleted, id.toString()),
|
||||
};
|
||||
}
|
||||
|
||||
public async runTask(id: TaskId, runType: RunType) {
|
||||
const state = this.taskState(id);
|
||||
if (state.running) throw new Error(`Task is already running: ${id}`);
|
||||
@ -72,9 +87,10 @@ export default class TaskService extends BaseService {
|
||||
this.taskStates_[id] = {
|
||||
...this.taskStates_[id],
|
||||
running: true,
|
||||
lastRunTime: new Date(),
|
||||
};
|
||||
|
||||
await this.models.event().create(EventType.TaskStarted, id.toString());
|
||||
|
||||
try {
|
||||
logger.info(`Running "${id}" (${runTypeToString(runType)})...`);
|
||||
await this.tasks_[id].run(this.models);
|
||||
@ -85,9 +101,10 @@ export default class TaskService extends BaseService {
|
||||
this.taskStates_[id] = {
|
||||
...this.taskStates_[id],
|
||||
running: false,
|
||||
lastCompletionTime: new Date(),
|
||||
};
|
||||
|
||||
await this.models.event().create(EventType.TaskCompleted, id.toString());
|
||||
|
||||
logger.info(`Completed "${id}" in ${Date.now() - startTime}ms`);
|
||||
}
|
||||
|
||||
@ -95,10 +112,10 @@ export default class TaskService extends BaseService {
|
||||
for (const [taskId, task] of Object.entries(this.tasks_)) {
|
||||
if (!task.schedule) continue;
|
||||
|
||||
logger.info(`Scheduling task "${taskId}": ${task.schedule}`);
|
||||
logger.info(`Scheduling task #${taskId} (${task.description}): ${task.schedule}`);
|
||||
|
||||
cron.schedule(task.schedule, async () => {
|
||||
await this.runTask(taskId, RunType.Scheduled);
|
||||
await this.runTask(Number(taskId), RunType.Scheduled);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
export type Uuid = string;
|
||||
export type Uuid = any;
|
||||
|
||||
export enum ItemAddressingType {
|
||||
Id = 1,
|
||||
@ -28,6 +28,11 @@ export enum ChangeType {
|
||||
Delete = 3,
|
||||
}
|
||||
|
||||
export enum EventType {
|
||||
TaskStarted = 1,
|
||||
TaskCompleted = 2,
|
||||
}
|
||||
|
||||
export enum UserFlagType {
|
||||
FailedPaymentWarning = 1,
|
||||
FailedPaymentFinal = 2,
|
||||
@ -245,6 +250,13 @@ export interface UserFlag extends WithDates {
|
||||
type?: UserFlagType;
|
||||
}
|
||||
|
||||
export interface Event extends WithUuid {
|
||||
counter?: number;
|
||||
type?: EventType;
|
||||
name?: string;
|
||||
created_time?: number;
|
||||
}
|
||||
|
||||
export const databaseSchema: DatabaseTables = {
|
||||
sessions: {
|
||||
id: { type: 'string' },
|
||||
@ -409,5 +421,12 @@ export const databaseSchema: DatabaseTables = {
|
||||
updated_time: { type: 'string' },
|
||||
created_time: { type: 'string' },
|
||||
},
|
||||
events: {
|
||||
id: { type: 'string' },
|
||||
counter: { type: 'number' },
|
||||
type: { type: 'number' },
|
||||
name: { type: 'string' },
|
||||
created_time: { type: 'string' },
|
||||
},
|
||||
};
|
||||
// AUTO-GENERATED-TYPES
|
||||
|
@ -34,6 +34,7 @@ const config = {
|
||||
'main.user_flags': 'WithDates',
|
||||
'main.user_items': 'WithDates',
|
||||
'main.users': 'WithDates, WithUuid',
|
||||
'main.events': 'WithUuid',
|
||||
},
|
||||
};
|
||||
|
||||
@ -55,6 +56,8 @@ const propertyTypes: Record<string, string> = {
|
||||
'users.max_item_size': 'number | null',
|
||||
'users.max_total_item_size': 'number | null',
|
||||
'users.total_item_size': 'number',
|
||||
'events.created_time': 'number',
|
||||
'events.type': 'EventType',
|
||||
};
|
||||
|
||||
function insertContentIntoFile(filePath: string, markerOpen: string, markerClose: string, contentToInsert: string): void {
|
||||
|
5
packages/server/src/utils/dbuuid.ts
Normal file
5
packages/server/src/utils/dbuuid.ts
Normal file
@ -0,0 +1,5 @@
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
export default function(): string {
|
||||
return uuidv4();
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import { Models } from '../models/factory';
|
||||
import TaskService, { Task } from '../services/TaskService';
|
||||
import TaskService, { Task, TaskId } from '../services/TaskService';
|
||||
import { Config, Env } from './types';
|
||||
|
||||
export default function(env: Env, models: Models, config: Config): TaskService {
|
||||
@ -7,19 +7,19 @@ export default function(env: Env, models: Models, config: Config): TaskService {
|
||||
|
||||
let tasks: Task[] = [
|
||||
{
|
||||
id: 'deleteExpiredTokens',
|
||||
id: TaskId.DeleteExpiredTokens,
|
||||
description: 'Delete expired tokens',
|
||||
schedule: '0 */6 * * *',
|
||||
run: (models: Models) => models.token().deleteExpiredTokens(),
|
||||
},
|
||||
{
|
||||
id: 'updateTotalSizes',
|
||||
id: TaskId.UpdateTotalSizes,
|
||||
description: 'Update total sizes',
|
||||
schedule: '0 * * * *',
|
||||
run: (models: Models) => models.item().updateTotalSizes(),
|
||||
},
|
||||
{
|
||||
id: 'handleOversizedAccounts',
|
||||
id: TaskId.HandleOversizedAccounts,
|
||||
description: 'Process oversized accounts',
|
||||
schedule: '0 14 * * *',
|
||||
run: (models: Models) => models.user().handleOversizedAccounts(),
|
||||
@ -29,13 +29,13 @@ export default function(env: Env, models: Models, config: Config): TaskService {
|
||||
if (config.isJoplinCloud) {
|
||||
tasks = tasks.concat([
|
||||
{
|
||||
id: 'handleBetaUserEmails',
|
||||
id: TaskId.HandleBetaUserEmails,
|
||||
description: 'Process beta user emails',
|
||||
schedule: '0 12 * * *',
|
||||
run: (models: Models) => models.user().handleBetaUserEmails(),
|
||||
},
|
||||
{
|
||||
id: 'handleFailedPaymentSubscriptions',
|
||||
id: TaskId.HandleFailedPaymentSubscriptions,
|
||||
description: 'Process failed payment subscriptions',
|
||||
schedule: '0 13 * * *',
|
||||
run: (models: Models) => models.user().handleFailedPaymentSubscriptions(),
|
||||
|
40
packages/tools/lernaInstall.js
Normal file
40
packages/tools/lernaInstall.js
Normal file
@ -0,0 +1,40 @@
|
||||
// Lerna doesn't provide any sensible way to add a package to a sub-package
|
||||
// without bootstrapping the whole project. It also doesn't allow adding
|
||||
// multiple packages, so for each one, everything has to be bootstrapped again.
|
||||
//
|
||||
// https://github.com/lerna/lerna/issues/2988
|
||||
//
|
||||
// This script fixes this by allowing to install multiple packges, and then run
|
||||
// a more optimised bootstrap just for that package.
|
||||
//
|
||||
// Usage, for example to add the "uuid" and "@types/uuid" packages to the server
|
||||
// sub-package:
|
||||
//
|
||||
// npm run i -- uuid @types/uuid @joplin/server
|
||||
|
||||
const { chdir } = require('process');
|
||||
const { execCommand2, rootDir } = require('./tool-utils');
|
||||
|
||||
function dirnameFromPackageName(n) {
|
||||
if (!n.includes('/')) return n;
|
||||
const s = n.split('/');
|
||||
return s.pop();
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const argv = require('yargs').argv;
|
||||
const toInstallPackages = argv._;
|
||||
const targetPackageName = toInstallPackages.pop();
|
||||
const targetPackageDir = `${rootDir}/packages/${dirnameFromPackageName(targetPackageName)}`;
|
||||
|
||||
chdir(targetPackageDir);
|
||||
await execCommand2(`npm install ${toInstallPackages.join(' ')}`);
|
||||
|
||||
chdir(rootDir);
|
||||
await execCommand2(`npx lerna bootstrap --include-dependents --include-dependencies --scope=${targetPackageName}`);
|
||||
}
|
||||
|
||||
main().catch((error) => {
|
||||
console.error('Fatal error:', error);
|
||||
process.exit(1);
|
||||
});
|
Loading…
Reference in New Issue
Block a user