You've already forked joplin
mirror of
https://github.com/laurent22/joplin.git
synced 2026-04-18 19:42:23 +02:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6197d05c90 | |||
| 02d92228ec |
@@ -320,7 +320,7 @@ export async function createResourcesFromPaths(mediaFiles: DownloadedMediaFile[]
|
||||
const resource = await shim.createResourceFromPath(mediaFile.path);
|
||||
return { ...mediaFile, resource };
|
||||
} catch (error) {
|
||||
logger.warn(`Cannot create resource for ${mediaFile.originalUrl}`, error);
|
||||
logger.info(`Cannot create resource for ${mediaFile.originalUrl}`, error);
|
||||
return { ...mediaFile, resource: null };
|
||||
}
|
||||
};
|
||||
|
||||
@@ -723,55 +723,50 @@ export default class ItemModel extends BaseModel<Item> {
|
||||
|
||||
const itemToSave = { ...o.item };
|
||||
|
||||
const content = itemToSave.content;
|
||||
delete itemToSave.content;
|
||||
|
||||
itemToSave.content_storage_id = (await this.storageDriver()).storageId;
|
||||
|
||||
itemToSave.content_size = content ? content.byteLength : 0;
|
||||
|
||||
// Here we save the item row and content, and we want to
|
||||
// make sure that either both are saved or none of them.
|
||||
// The savepoint wraps the entire operation so that any
|
||||
// error (including unique constraint violations) is
|
||||
// rolled back cleanly without aborting the outer
|
||||
// transaction.
|
||||
|
||||
// TODO: When an item is uploaded multiple times
|
||||
// simultaneously there could be a race condition, where the
|
||||
// content would not match the db row (for example, the
|
||||
// content_size would differ).
|
||||
//
|
||||
// Possible solutions:
|
||||
//
|
||||
// - Row-level lock on items.id, and release once the
|
||||
// content is saved.
|
||||
// - Or external lock - eg. Redis.
|
||||
|
||||
const savePoint = await this.setSavePoint();
|
||||
|
||||
try {
|
||||
const content = itemToSave.content;
|
||||
delete itemToSave.content;
|
||||
|
||||
itemToSave.content_storage_id = (await this.storageDriver()).storageId;
|
||||
|
||||
itemToSave.content_size = content ? content.byteLength : 0;
|
||||
|
||||
// Here we save the item row and content, and we want to
|
||||
// make sure that either both are saved or none of them.
|
||||
// This is done by setting up a save point before saving the
|
||||
// row, and rollbacking if the content cannot be saved.
|
||||
//
|
||||
// Normally, since we are in a transaction, throwing an
|
||||
// error should work, but since we catch all errors within
|
||||
// this block it doesn't work.
|
||||
|
||||
// TODO: When an item is uploaded multiple times
|
||||
// simultaneously there could be a race condition, where the
|
||||
// content would not match the db row (for example, the
|
||||
// content_size would differ).
|
||||
//
|
||||
// Possible solutions:
|
||||
//
|
||||
// - Row-level lock on items.id, and release once the
|
||||
// content is saved.
|
||||
// - Or external lock - eg. Redis.
|
||||
|
||||
const savePoint = await this.setSavePoint();
|
||||
const savedItem = await this.saveForUser(user.id, itemToSave);
|
||||
|
||||
try {
|
||||
await this.storageDriverWrite(savedItem.id, content, { models: this.models() });
|
||||
await this.releaseSavePoint(savePoint);
|
||||
} catch (error) {
|
||||
await this.rollbackSavePoint(savePoint);
|
||||
throw error;
|
||||
}
|
||||
await this.storageDriverWrite(savedItem.id, content, { models: this.models() });
|
||||
|
||||
if (o.isNote) {
|
||||
await this.models().itemResource().deleteByItemId(savedItem.id);
|
||||
await this.models().itemResource().addResourceIds(savedItem.id, o.resourceIds);
|
||||
}
|
||||
|
||||
await this.releaseSavePoint(savePoint);
|
||||
|
||||
output[name] = {
|
||||
item: savedItem,
|
||||
error: null,
|
||||
};
|
||||
} catch (error) {
|
||||
await this.rollbackSavePoint(savePoint);
|
||||
output[name] = {
|
||||
item: null,
|
||||
error: error,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import Logger from '@joplin/utils/Logger';
|
||||
import newModelFactory, { Models } from '../models/factory';
|
||||
import { DbConnection, disconnectDb } from '../db';
|
||||
import { Models } from '../models/factory';
|
||||
import { Config, Env } from '../utils/types';
|
||||
import BaseService from './BaseService';
|
||||
import { Event, EventType, TaskId, TaskState } from './database/types';
|
||||
@@ -68,29 +67,16 @@ export default class TaskService extends BaseService {
|
||||
|
||||
private tasks_: Tasks = {};
|
||||
private services_: Services;
|
||||
private taskStateModels_: Models;
|
||||
private taskStateDb_: DbConnection;
|
||||
|
||||
public constructor(env: Env, models: Models, config: Config, services: Services, taskStateDb: DbConnection = null) {
|
||||
public constructor(env: Env, models: Models, config: Config, services: Services) {
|
||||
super(env, models, config);
|
||||
this.services_ = services;
|
||||
this.taskStateDb_ = taskStateDb;
|
||||
this.taskStateModels_ = taskStateDb ? newModelFactory(taskStateDb, taskStateDb, config) : models;
|
||||
}
|
||||
|
||||
public async destroy() {
|
||||
await super.destroy();
|
||||
if (this.taskStateDb_) {
|
||||
await disconnectDb(this.taskStateDb_);
|
||||
this.taskStateDb_ = null;
|
||||
this.taskStateModels_ = null;
|
||||
}
|
||||
}
|
||||
|
||||
public async registerTask(task: Task) {
|
||||
if (this.tasks_[task.id]) throw new Error(`Already a task with this ID: ${task.id}`);
|
||||
this.tasks_[task.id] = task;
|
||||
await this.taskStateModels_.taskState().init(task.id);
|
||||
await this.models.taskState().init(task.id);
|
||||
}
|
||||
|
||||
public async registerTasks(tasks: Task[]) {
|
||||
@@ -106,7 +92,7 @@ export default class TaskService extends BaseService {
|
||||
}
|
||||
|
||||
public async taskStates(ids: TaskId[]): Promise<TaskState[]> {
|
||||
return this.taskStateModels_.taskState().loadByTaskIds(ids);
|
||||
return this.models.taskState().loadByTaskIds(ids);
|
||||
}
|
||||
|
||||
public async taskState(id: TaskId): Promise<TaskState> {
|
||||
@@ -117,17 +103,17 @@ export default class TaskService extends BaseService {
|
||||
|
||||
public async taskLastEvents(id: TaskId): Promise<TaskEvents> {
|
||||
return {
|
||||
taskStarted: await this.taskStateModels_.event().lastEventByTypeAndName(EventType.TaskStarted, id.toString()),
|
||||
taskCompleted: await this.taskStateModels_.event().lastEventByTypeAndName(EventType.TaskCompleted, id.toString()),
|
||||
taskStarted: await this.models.event().lastEventByTypeAndName(EventType.TaskStarted, id.toString()),
|
||||
taskCompleted: await this.models.event().lastEventByTypeAndName(EventType.TaskCompleted, id.toString()),
|
||||
};
|
||||
}
|
||||
|
||||
public async resetInterruptedTasks() {
|
||||
const taskStates = await this.taskStateModels_.taskState().all();
|
||||
const taskStates = await this.models.taskState().all();
|
||||
for (const taskState of taskStates) {
|
||||
if (taskState.running) {
|
||||
logger.warn(`Found a task that was in running state: ${this.taskDisplayString(taskState.task_id)} - resetting it.`);
|
||||
await this.taskStateModels_.taskState().stop(taskState.task_id);
|
||||
await this.models.taskState().stop(taskState.task_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -144,7 +130,7 @@ export default class TaskService extends BaseService {
|
||||
|
||||
public async runTask(id: TaskId, runType: RunType) {
|
||||
const displayString = this.taskDisplayString(id);
|
||||
const taskState = await this.taskStateModels_.taskState().loadByTaskId(id);
|
||||
const taskState = await this.models.taskState().loadByTaskId(id);
|
||||
if (!taskState) throw new Error(`Invalid task: ${id}: ${runType}`);
|
||||
|
||||
if (!taskState.enabled) {
|
||||
@@ -152,11 +138,11 @@ export default class TaskService extends BaseService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.taskStateModels_.taskState().start(id);
|
||||
await this.models.taskState().start(id);
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
await this.taskStateModels_.event().create(EventType.TaskStarted, id.toString());
|
||||
await this.models.event().create(EventType.TaskStarted, id.toString());
|
||||
|
||||
try {
|
||||
logger.info(`Running ${displayString} (${runTypeToString(runType)})...`);
|
||||
@@ -165,14 +151,14 @@ export default class TaskService extends BaseService {
|
||||
logger.error(`On ${displayString}`, error);
|
||||
}
|
||||
|
||||
await this.taskStateModels_.taskState().stop(id);
|
||||
await this.taskStateModels_.event().create(EventType.TaskCompleted, id.toString());
|
||||
await this.models.taskState().stop(id);
|
||||
await this.models.event().create(EventType.TaskCompleted, id.toString());
|
||||
|
||||
logger.info(`Completed ${this.taskDisplayString(id)} in ${Date.now() - startTime}ms`);
|
||||
}
|
||||
|
||||
public async enableTask(taskId: TaskId, enabled = true) {
|
||||
await this.taskStateModels_.taskState().enable(taskId, enabled);
|
||||
await this.models.taskState().enable(taskId, enabled);
|
||||
}
|
||||
|
||||
public async runInBackground() {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { Models } from '../models/factory';
|
||||
import { connectDb } from '../db';
|
||||
import { TaskId } from '../services/database/types';
|
||||
import TaskService, { Task, taskIdToLabel } from '../services/TaskService';
|
||||
import { Services } from '../services/types';
|
||||
@@ -8,12 +7,7 @@ import { Config, Env } from './types';
|
||||
import { Day } from './time';
|
||||
|
||||
export default async function(env: Env, models: Models, config: Config, services: Services): Promise<TaskService> {
|
||||
// In production, use a separate DB connection pool for task state
|
||||
// management so that it is not affected by failed transactions in the
|
||||
// main connection pool. In dev/test, we reuse the main connection to
|
||||
// avoid exhausting Postgres connection slots in CI.
|
||||
const taskStateDb = env === Env.Prod ? await connectDb({ ...config.database, maxConnections: 1 }) : null;
|
||||
const taskService = new TaskService(env, models, config, services, taskStateDb);
|
||||
const taskService = new TaskService(env, models, config, services);
|
||||
|
||||
let tasks: Task[] = [
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user