1
0
mirror of https://github.com/laurent22/joplin.git synced 2026-04-18 19:42:23 +02:00

Compare commits

..

4 Commits

Author SHA1 Message Date
Laurent Cozic ba32e32c58 fix ci 2026-04-17 14:16:21 +01:00
Laurent Cozic 55a1406d63 Consider pool lifecycle and capacity planning for the dedicated task-state pool 2026-04-17 09:10:51 +01:00
Laurent Cozic b431c9b5c5 fixed tests 2026-04-17 08:54:02 +01:00
Laurent Cozic 781f3cb885 update 2026-04-17 08:14:13 +01:00
3 changed files with 37 additions and 23 deletions
+2 -8
View File
@@ -1026,16 +1026,10 @@ export default class ItemModel extends BaseModel<Item> {
// but it would be nice to get to the bottom of this bug.
public processOrphanedItems = async () => {
await this.withTransaction(async () => {
// Find items that have no corresponding entry in user_items.
// NOT EXISTS is used instead of LEFT JOIN for performance as it
// allows Postgres to short-circuit on the first match per item.
const orphanedItems: Item[] = await this.db(this.tableName)
.select(['items.id', 'items.owner_id'])
.whereNotExists(
this.db('user_items')
.select(this.db.raw('1'))
.whereRaw('user_items.item_id = items.id'),
);
.leftJoin('user_items', 'user_items.item_id', 'items.id')
.whereNull('user_items.user_id');
const userIds: string[] = orphanedItems.map(i => i.owner_id);
const users = await this.models().user().loadByIds(userIds, { fields: ['id'] });
+28 -14
View File
@@ -1,5 +1,6 @@
import Logger from '@joplin/utils/Logger';
import { Models } from '../models/factory';
import newModelFactory, { Models } from '../models/factory';
import { DbConnection, disconnectDb } from '../db';
import { Config, Env } from '../utils/types';
import BaseService from './BaseService';
import { Event, EventType, TaskId, TaskState } from './database/types';
@@ -67,16 +68,29 @@ export default class TaskService extends BaseService {
private tasks_: Tasks = {};
private services_: Services;
private taskStateModels_: Models;
private taskStateDb_: DbConnection;
public constructor(env: Env, models: Models, config: Config, services: Services) {
public constructor(env: Env, models: Models, config: Config, services: Services, taskStateDb: DbConnection = null) {
super(env, models, config);
this.services_ = services;
this.taskStateDb_ = taskStateDb;
this.taskStateModels_ = taskStateDb ? newModelFactory(taskStateDb, taskStateDb, config) : models;
}
public async destroy() {
await super.destroy();
if (this.taskStateDb_) {
await disconnectDb(this.taskStateDb_);
this.taskStateDb_ = null;
this.taskStateModels_ = null;
}
}
public async registerTask(task: Task) {
if (this.tasks_[task.id]) throw new Error(`Already a task with this ID: ${task.id}`);
this.tasks_[task.id] = task;
await this.models.taskState().init(task.id);
await this.taskStateModels_.taskState().init(task.id);
}
public async registerTasks(tasks: Task[]) {
@@ -92,7 +106,7 @@ export default class TaskService extends BaseService {
}
public async taskStates(ids: TaskId[]): Promise<TaskState[]> {
return this.models.taskState().loadByTaskIds(ids);
return this.taskStateModels_.taskState().loadByTaskIds(ids);
}
public async taskState(id: TaskId): Promise<TaskState> {
@@ -103,17 +117,17 @@ export default class TaskService extends BaseService {
public async taskLastEvents(id: TaskId): Promise<TaskEvents> {
return {
taskStarted: await this.models.event().lastEventByTypeAndName(EventType.TaskStarted, id.toString()),
taskCompleted: await this.models.event().lastEventByTypeAndName(EventType.TaskCompleted, id.toString()),
taskStarted: await this.taskStateModels_.event().lastEventByTypeAndName(EventType.TaskStarted, id.toString()),
taskCompleted: await this.taskStateModels_.event().lastEventByTypeAndName(EventType.TaskCompleted, id.toString()),
};
}
public async resetInterruptedTasks() {
const taskStates = await this.models.taskState().all();
const taskStates = await this.taskStateModels_.taskState().all();
for (const taskState of taskStates) {
if (taskState.running) {
logger.warn(`Found a task that was in running state: ${this.taskDisplayString(taskState.task_id)} - resetting it.`);
await this.models.taskState().stop(taskState.task_id);
await this.taskStateModels_.taskState().stop(taskState.task_id);
}
}
}
@@ -130,7 +144,7 @@ export default class TaskService extends BaseService {
public async runTask(id: TaskId, runType: RunType) {
const displayString = this.taskDisplayString(id);
const taskState = await this.models.taskState().loadByTaskId(id);
const taskState = await this.taskStateModels_.taskState().loadByTaskId(id);
if (!taskState) throw new Error(`Invalid task: ${id}: ${runType}`);
if (!taskState.enabled) {
@@ -138,11 +152,11 @@ export default class TaskService extends BaseService {
return;
}
await this.models.taskState().start(id);
await this.taskStateModels_.taskState().start(id);
const startTime = Date.now();
await this.models.event().create(EventType.TaskStarted, id.toString());
await this.taskStateModels_.event().create(EventType.TaskStarted, id.toString());
try {
logger.info(`Running ${displayString} (${runTypeToString(runType)})...`);
@@ -151,14 +165,14 @@ export default class TaskService extends BaseService {
logger.error(`On ${displayString}`, error);
}
await this.models.taskState().stop(id);
await this.models.event().create(EventType.TaskCompleted, id.toString());
await this.taskStateModels_.taskState().stop(id);
await this.taskStateModels_.event().create(EventType.TaskCompleted, id.toString());
logger.info(`Completed ${this.taskDisplayString(id)} in ${Date.now() - startTime}ms`);
}
public async enableTask(taskId: TaskId, enabled = true) {
await this.models.taskState().enable(taskId, enabled);
await this.taskStateModels_.taskState().enable(taskId, enabled);
}
public async runInBackground() {
@@ -1,4 +1,5 @@
import { Models } from '../models/factory';
import { connectDb } from '../db';
import { TaskId } from '../services/database/types';
import TaskService, { Task, taskIdToLabel } from '../services/TaskService';
import { Services } from '../services/types';
@@ -7,7 +8,12 @@ import { Config, Env } from './types';
import { Day } from './time';
export default async function(env: Env, models: Models, config: Config, services: Services): Promise<TaskService> {
const taskService = new TaskService(env, models, config, services);
// In production, use a separate DB connection pool for task state
// management so that it is not affected by failed transactions in the
// main connection pool. In dev/test, we reuse the main connection to
// avoid exhausting Postgres connection slots in CI.
const taskStateDb = env === Env.Prod ? await connectDb({ ...config.database, maxConnections: 1 }) : null;
const taskService = new TaskService(env, models, config, services, taskStateDb);
let tasks: Task[] = [
{