1
0
mirror of https://github.com/laurent22/joplin.git synced 2025-08-24 20:19:10 +02:00

Transcribe: Fixes #12766: Remove processed files and clean up after a retention period (#12827)

This commit is contained in:
pedr
2025-08-18 09:57:34 -03:00
committed by GitHub
parent ae6b57c5a5
commit f02a94bef5
11 changed files with 180 additions and 20 deletions

View File

@@ -18,6 +18,8 @@ HTR_CLI_IMAGES_FOLDER=
QUEUE_DRIVER=pg
# QUEUE_DRIVER=sqlite
FILE_STORAGE_MAINTENANCE_INTERVAL=3600000
FILE_STORAGE_TTL=604800000 # one week
# =============================================================================
# Queue driver

View File

@@ -29,13 +29,14 @@ const init = async (logger: LoggerWrapper) => {
const queue = await createQueue(envVariables, true);
const fileStorage = new FileStorage();
fileStorage.initMaintenance(envVariables.FILE_STORAGE_TTL, envVariables.FILE_STORAGE_MAINTENANCE_INTERVAL);
app.context.queue = queue;
app.context.storage = fileStorage;
const htrCli = new HtrCli(envVariables.HTR_CLI_DOCKER_IMAGE, envVariables.HTR_CLI_IMAGES_FOLDER);
const jobProcessor = new JobProcessor(queue, htrCli);
const jobProcessor = new JobProcessor(queue, htrCli, fileStorage);
logger.info('Starting worker');
await jobProcessor.init();

View File

@@ -45,6 +45,7 @@ describe('createJob', () => {
filePath: 'file-id',
},
id: result.jobId,
retryCount: 0,
});
});

View File

@@ -1,10 +1,11 @@
import { Day, Hour, Minute, Second } from '@joplin/utils/time';
export const defaultEnvValues: EnvVariables = {
SERVER_PORT: 4567,
API_KEY: '',
QUEUE_TTL: 900000,
QUEUE_TTL: 15 * Minute,
QUEUE_RETRY_COUNT: 2,
QUEUE_MAINTENANCE_INTERVAL: 60000,
QUEUE_MAINTENANCE_INTERVAL: 60 * Second,
HTR_CLI_DOCKER_IMAGE: 'joplin/htr-cli:0.0.2',
HTR_CLI_IMAGES_FOLDER: '',
QUEUE_DRIVER: 'pg', // 'sqlite'
@@ -12,6 +13,8 @@ export const defaultEnvValues: EnvVariables = {
QUEUE_DATABASE_NAME: '',
QUEUE_DATABASE_USER: '',
QUEUE_DATABASE_PORT: 5432,
FILE_STORAGE_MAINTENANCE_INTERVAL: 1 * Hour,
FILE_STORAGE_TTL: 7 * Day,
QUEUE_DATABASE_HOST: 'localhost',
};
@@ -28,6 +31,8 @@ export interface EnvVariables {
QUEUE_DATABASE_NAME: string;
QUEUE_DATABASE_USER: string;
QUEUE_DATABASE_PORT: number;
FILE_STORAGE_MAINTENANCE_INTERVAL: number;
FILE_STORAGE_TTL: number;
QUEUE_DATABASE_HOST: string;
}

View File

@@ -1,9 +1,16 @@
import { copyFile, exists, remove } from 'fs-extra';
import { readdir, copyFile, exists, remove } from 'fs-extra';
import { join } from 'path';
import FileStorage from './FileStorage';
import initiateLogger from './initiateLogger';
import Logger from '@joplin/utils/Logger';
describe('FileStorage', () => {
beforeAll(() => {
initiateLogger();
Logger.globalLogger.enabled = false;
});
it('should move file to storage folder', async () => {
await copyFile('./images/htr_sample.png', './test_file.png');
@@ -30,5 +37,26 @@ describe('FileStorage', () => {
await remove(join('images', name));
});
it('should remove files that are older than the given date', async () => {
const mockedFilenames = [
`${new Date('2025-03-01 17:44').getTime()}_should_delete`,
`${new Date('2025-03-02 17:44').getTime()}_should_delete`,
`${new Date('2025-03-04 17:44').getTime()}_not_deleted`,
];
const mockedFiles = mockedFilenames.map(name => join('images', name));
for (const file of mockedFiles) {
await copyFile('./images/htr_sample.png', file);
}
const fs = new FileStorage();
await fs.removeOldFiles(new Date('2025-03-03 12:00'));
const files = await readdir('images');
expect(files.length).toBe(2);
expect(files.includes(mockedFilenames[2])).toBe(true);
for (const file of mockedFiles) {
await remove(file);
}
});
});

View File

@@ -1,13 +1,58 @@
import { join } from 'path';
import { move } from 'fs-extra';
import { move, readdir, remove } from 'fs-extra';
import { randomBytes } from 'crypto';
import { ContentStorage } from '../types';
import Logger from '@joplin/utils/Logger';
const logger = Logger.create('FileStorage');
export default class FileStorage implements ContentStorage {
private isMaintenanceRunning = false;
public async store(filepath: string) {
const randomName = randomBytes(16).toString('hex');
const time = new Date().getTime();
const random = randomBytes(16).toString('hex');
const randomName = `${time}_${random}`;
await move(filepath, join('images', randomName));
return randomName;
}
public async remove(filename: string) {
logger.info(`Deleting: ${filename}`);
await remove(join('images', filename));
}
public initMaintenance(retentionDuration: number, maintenanceInterval: number) {
logger.info('Maintenance started.');
setInterval(async () => {
if (this.isMaintenanceRunning) return;
this.isMaintenanceRunning = true;
const olderThan = new Date(new Date().getTime() - retentionDuration);
logger.info(`Deleting files older than: ${olderThan}`);
await this.removeOldFiles(olderThan);
this.isMaintenanceRunning = false;
}, maintenanceInterval);
}
public async removeOldFiles(olderThan: Date) {
const files = await readdir('images');
const filesToBeDeleted = files
.map(f => {
const datetimePart = parseInt(f.split('_')[0], 10);
return {
filepath: f,
timestamp: new Date(datetimePart),
};
})
.filter(f => {
return f.timestamp.getTime() < olderThan.getTime();
});
logger.info(`Files found to be deleted: ${filesToBeDeleted.length}`);
for (const file of filesToBeDeleted) {
await this.remove(file.filepath);
}
}
}

View File

@@ -1,6 +1,6 @@
import Logger from '@joplin/utils/Logger';
import PgBoss = require('pg-boss');
import { BaseQueue, JobData, JobWithResult, QueueConfiguration } from '../../types';
import { BaseQueue, JobData, JobWithData, JobWithResult, QueueConfiguration } from '../../types';
import { ErrorBadRequest } from '../../errors';
import { Day, Minute, Second } from '@joplin/utils/time';
@@ -58,7 +58,7 @@ export default class PgBossQueue implements BaseQueue {
}
public async fetch() {
const jobs = await this.boss.fetch<JobData>(this.queue, { batchSize: 1 });
const jobs = await this.boss.fetch<JobData>(this.queue, { batchSize: 1, includeMetadata: true });
if (jobs.length === 0) return null;
return jobs[0];
}
@@ -80,6 +80,10 @@ export default class PgBossQueue implements BaseQueue {
return result as JobWithResult;
}
public hasJobFailedTooManyTimes(job: JobWithData): boolean {
return job.retryCount >= this.options.retryCount;
}
public async stop() {
return this.boss.stop();
}

View File

@@ -1,4 +1,4 @@
import { BaseQueue, JobData, JobStates, jobStateToEnum, QueueConfiguration, Result } from '../../types';
import { BaseQueue, JobData, JobStates, jobStateToEnum, JobWithData, QueueConfiguration, Result } from '../../types';
import KnexConstructor, { Knex } from 'knex';
import Logger from '@joplin/utils/Logger';
import { formatMsToUTC, goBackInTime, Minute, msleep, Second } from '@joplin/utils/time';
@@ -101,7 +101,11 @@ export default class SqliteQueue implements BaseQueue {
updated_on: this.sqlite.fn.now(),
}).table('job').where({ id: job.id });
return { id: job.id, data: JSON.parse(job.data) };
return {
id: job.id,
retryCount: job.retry_count,
data: JSON.parse(job.data),
};
}
public async fail(jobId: string, error: Error) {
@@ -196,6 +200,10 @@ export default class SqliteQueue implements BaseQueue {
}
}
public hasJobFailedTooManyTimes(job: JobWithData): boolean {
return job.retryCount >= this.options.retryCount;
}
public async stop() {
if (this.maintenanceIntervalRef) {
clearInterval(this.maintenanceIntervalRef);

View File

@@ -47,10 +47,12 @@ export interface BaseQueue {
complete(jobId: string, data: Result): Promise<void>;
getJobById(id: string): Promise<JobWithResult>;
stop(): Promise<void>;
hasJobFailedTooManyTimes(job: JobWithData): boolean;
}
export interface ContentStorage {
store(filepath: string): Promise<string>;
remove(filepath: string): Promise<void>;
}
export type AppDefinedContext = {
@@ -62,6 +64,7 @@ export type AppContext = Context & AppDefinedContext;
export type JobWithData = {
id: string;
retryCount: number;
data: JobData;
};

View File

@@ -5,6 +5,9 @@ import JobProcessor from './JobProcessor';
import HtrCli from '../core/HtrCli';
import { Minute, msleep, Second } from '@joplin/utils/time';
import { BaseQueue, OutputSuccess } from '../types';
import FileStorage from '../services/FileStorage';
import { join } from 'path';
import { copy, exists } from 'fs-extra';
// since the model is not deterministic, it can, sometimes, output slightly difference responses
const cleanUpResult = (result: string) => {
@@ -33,13 +36,14 @@ describe('JobProcessor', () => {
skipByDefault('should execute work on job in the queue', async () => {
jest.useRealTimers();
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', 'images'), 1000);
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000);
await tw.init();
const jobId = await queue.send({ filePath: 'htr_sample.png' });
await copy(join('images', 'htr_sample.png'), join('images', 'htr_sample_copy.png'));
const jobId = await queue.send({ filePath: 'htr_sample_copy.png' });
for (let i = 0; i < 20; i++) {
await msleep(30 * Second);
for (let i = 0; i < 36; i++) {
await msleep(10 * Second);
const response = await queue.getJobById(jobId);
if (response.state === 'active') continue;
@@ -55,14 +59,15 @@ describe('JobProcessor', () => {
skipByDefault('should execute work on job in the queue even if one fails', async () => {
jest.useRealTimers();
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', 'images'), 1000);
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000);
await tw.init();
await copy(join('images', 'htr_sample.png'), join('images', 'htr_sample_copy_2.png'));
const jobId1 = await queue.send({ filePath: 'non-existing-file' });
const jobId2 = await queue.send({ filePath: 'htr_sample.png' });
const jobId2 = await queue.send({ filePath: 'htr_sample_copy_2.png' });
for (let i = 0; i < 20; i++) {
await msleep(30 * Second);
for (let i = 0; i < 36; i++) {
await msleep(10 * Second);
const response1 = await queue.getJobById(jobId1);
if (response1.state === 'active') continue;
expect(response1.state).toEqual('failed');
@@ -76,4 +81,56 @@ describe('JobProcessor', () => {
return;
}
}, 6 * Minute);
skipIfCI('should remove file sent to queue if job is completed', async () => {
jest.useRealTimers();
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000);
await tw.init();
const imagePath = join('images', 'htr_sample_copy_3.png');
await copy(join('images', 'htr_sample.png'), imagePath);
const jobId = await queue.send({ filePath: 'htr_sample_copy_3.png' });
for (let i = 0; i < 36; i++) {
await msleep(10 * Second);
const response = await queue.getJobById(jobId);
if (response.state === 'active') continue;
expect(response.id).toEqual(jobId);
expect(response.state).toEqual('completed');
const isFilePresent = await exists(imagePath);
expect(isFilePresent).toBe(false);
return;
}
}, 6 * Minute);
skipIfCI('should remove file sent to queue if job fails too many times', async () => {
jest.useRealTimers();
const fileStorage = new FileStorage();
const mockedFileStorageRemove = jest.fn();
fileStorage.remove = mockedFileStorageRemove;
const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), fileStorage, 1000);
await tw.init();
// file doesn't exist to force a fail, but the call to remove the file should still exist
const jobId = await queue.send({ filePath: 'non_existing_file.png' });
for (let i = 0; i < 36; i++) {
await msleep(10 * Second);
const response = await queue.getJobById(jobId);
if (response.state === 'active') continue;
expect(response.id).toEqual(jobId);
expect(response.state).toEqual('failed');
expect(mockedFileStorageRemove).toHaveBeenCalledWith('non_existing_file.png');
return;
}
}, 6 * Minute);
});

View File

@@ -1,5 +1,5 @@
import Logger from '@joplin/utils/Logger';
import { BaseQueue, JobWithData, WorkHandler } from '../types';
import { BaseQueue, ContentStorage, JobWithData, WorkHandler } from '../types';
const logger = Logger.create('JobProcessor');
@@ -10,10 +10,12 @@ export default class JobProcessor {
private checkInteval = 5000;
private currentJob: JobWithData | null = null;
private workHandler: WorkHandler;
private contentStorage: ContentStorage;
public constructor(queue: BaseQueue, workHandler: WorkHandler, checkInterval?: number) {
public constructor(queue: BaseQueue, workHandler: WorkHandler, contentStorage: ContentStorage, checkInterval?: number) {
this.queue = queue;
this.workHandler = workHandler;
this.contentStorage = contentStorage;
this.checkInteval = checkInterval ?? 5000;
logger.info('Created JobProcessor');
}
@@ -48,6 +50,7 @@ export default class JobProcessor {
logger.info(`Processing job ${this.currentJob.id}`);
const transcription = await this.workHandler.run(this.currentJob.data.filePath);
await this.queue.complete(this.currentJob.id, { result: transcription });
await this.contentStorage.remove(this.currentJob.data.filePath);
}
public async runOnce() {
@@ -58,6 +61,9 @@ export default class JobProcessor {
const e = error as Error;
if (this.currentJob) {
await this.queue.fail(this.currentJob.id, e);
if (this.queue.hasJobFailedTooManyTimes(this.currentJob)) {
await this.contentStorage.remove(this.currentJob.data.filePath);
}
}
} finally {
this.currentJob = null;