diff --git a/.env-transcribe-sample b/.env-transcribe-sample index eb715fed33..f2cb7b28c2 100644 --- a/.env-transcribe-sample +++ b/.env-transcribe-sample @@ -18,6 +18,8 @@ HTR_CLI_IMAGES_FOLDER= QUEUE_DRIVER=pg # QUEUE_DRIVER=sqlite +FILE_STORAGE_MAINTENANCE_INTERVAL=3600000 +FILE_STORAGE_TTL=604800000 # one week # ============================================================================= # Queue driver diff --git a/packages/transcribe/src/api/app.ts b/packages/transcribe/src/api/app.ts index 04e1fb13c4..dd792b1231 100644 --- a/packages/transcribe/src/api/app.ts +++ b/packages/transcribe/src/api/app.ts @@ -29,13 +29,14 @@ const init = async (logger: LoggerWrapper) => { const queue = await createQueue(envVariables, true); const fileStorage = new FileStorage(); + fileStorage.initMaintenance(envVariables.FILE_STORAGE_TTL, envVariables.FILE_STORAGE_MAINTENANCE_INTERVAL); app.context.queue = queue; app.context.storage = fileStorage; const htrCli = new HtrCli(envVariables.HTR_CLI_DOCKER_IMAGE, envVariables.HTR_CLI_IMAGES_FOLDER); - const jobProcessor = new JobProcessor(queue, htrCli); + const jobProcessor = new JobProcessor(queue, htrCli, fileStorage); logger.info('Starting worker'); await jobProcessor.init(); diff --git a/packages/transcribe/src/api/handler/createJob.test.ts b/packages/transcribe/src/api/handler/createJob.test.ts index a37969ba9f..6a008f6d68 100644 --- a/packages/transcribe/src/api/handler/createJob.test.ts +++ b/packages/transcribe/src/api/handler/createJob.test.ts @@ -45,6 +45,7 @@ describe('createJob', () => { filePath: 'file-id', }, id: result.jobId, + retryCount: 0, }); }); diff --git a/packages/transcribe/src/env.ts b/packages/transcribe/src/env.ts index 5d147b64b0..fe43b79607 100644 --- a/packages/transcribe/src/env.ts +++ b/packages/transcribe/src/env.ts @@ -1,10 +1,11 @@ +import { Day, Hour, Minute, Second } from '@joplin/utils/time'; export const defaultEnvValues: EnvVariables = { SERVER_PORT: 4567, API_KEY: '', - QUEUE_TTL: 900000, + QUEUE_TTL: 15 * Minute, QUEUE_RETRY_COUNT: 2, - QUEUE_MAINTENANCE_INTERVAL: 60000, + QUEUE_MAINTENANCE_INTERVAL: 60 * Second, HTR_CLI_DOCKER_IMAGE: 'joplin/htr-cli:0.0.2', HTR_CLI_IMAGES_FOLDER: '', QUEUE_DRIVER: 'pg', // 'sqlite' @@ -12,6 +13,8 @@ export const defaultEnvValues: EnvVariables = { QUEUE_DATABASE_NAME: '', QUEUE_DATABASE_USER: '', QUEUE_DATABASE_PORT: 5432, + FILE_STORAGE_MAINTENANCE_INTERVAL: 1 * Hour, + FILE_STORAGE_TTL: 7 * Day, QUEUE_DATABASE_HOST: 'localhost', }; @@ -28,6 +31,8 @@ export interface EnvVariables { QUEUE_DATABASE_NAME: string; QUEUE_DATABASE_USER: string; QUEUE_DATABASE_PORT: number; + FILE_STORAGE_MAINTENANCE_INTERVAL: number; + FILE_STORAGE_TTL: number; QUEUE_DATABASE_HOST: string; } diff --git a/packages/transcribe/src/services/FileStorage.test.ts b/packages/transcribe/src/services/FileStorage.test.ts index 4802ec0e5e..25a9353230 100644 --- a/packages/transcribe/src/services/FileStorage.test.ts +++ b/packages/transcribe/src/services/FileStorage.test.ts @@ -1,9 +1,16 @@ -import { copyFile, exists, remove } from 'fs-extra'; +import { readdir, copyFile, exists, remove } from 'fs-extra'; import { join } from 'path'; import FileStorage from './FileStorage'; +import initiateLogger from './initiateLogger'; +import Logger from '@joplin/utils/Logger'; describe('FileStorage', () => { + beforeAll(() => { + initiateLogger(); + Logger.globalLogger.enabled = false; + }); + it('should move file to storage folder', async () => { await copyFile('./images/htr_sample.png', './test_file.png'); @@ -30,5 +37,26 @@ describe('FileStorage', () => { await remove(join('images', name)); }); + it('should remove files that are older than the given date', async () => { + const mockedFilenames = [ + `${new Date('2025-03-01 17:44').getTime()}_should_delete`, + `${new Date('2025-03-02 17:44').getTime()}_should_delete`, + `${new Date('2025-03-04 17:44').getTime()}_not_deleted`, + ]; + const mockedFiles = mockedFilenames.map(name => join('images', name)); + for (const file of mockedFiles) { + await copyFile('./images/htr_sample.png', file); + } + + const fs = new FileStorage(); + await fs.removeOldFiles(new Date('2025-03-03 12:00')); + const files = await readdir('images'); + expect(files.length).toBe(2); + expect(files.includes(mockedFilenames[2])).toBe(true); + + for (const file of mockedFiles) { + await remove(file); + } + }); }); diff --git a/packages/transcribe/src/services/FileStorage.ts b/packages/transcribe/src/services/FileStorage.ts index 7452851aa3..806ea3a194 100644 --- a/packages/transcribe/src/services/FileStorage.ts +++ b/packages/transcribe/src/services/FileStorage.ts @@ -1,13 +1,58 @@ import { join } from 'path'; -import { move } from 'fs-extra'; +import { move, readdir, remove } from 'fs-extra'; import { randomBytes } from 'crypto'; import { ContentStorage } from '../types'; +import Logger from '@joplin/utils/Logger'; + +const logger = Logger.create('FileStorage'); export default class FileStorage implements ContentStorage { + private isMaintenanceRunning = false; + public async store(filepath: string) { - const randomName = randomBytes(16).toString('hex'); + const time = new Date().getTime(); + const random = randomBytes(16).toString('hex'); + const randomName = `${time}_${random}`; await move(filepath, join('images', randomName)); return randomName; } + + public async remove(filename: string) { + logger.info(`Deleting: ${filename}`); + await remove(join('images', filename)); + } + + public initMaintenance(retentionDuration: number, maintenanceInterval: number) { + logger.info('Maintenance started.'); + setInterval(async () => { + if (this.isMaintenanceRunning) return; + + this.isMaintenanceRunning = true; + const olderThan = new Date(new Date().getTime() - retentionDuration); + logger.info(`Deleting files older than: ${olderThan}`); + await this.removeOldFiles(olderThan); + this.isMaintenanceRunning = false; + }, maintenanceInterval); + } + + public async removeOldFiles(olderThan: Date) { + const files = await readdir('images'); + const filesToBeDeleted = files + .map(f => { + const datetimePart = parseInt(f.split('_')[0], 10); + return { + filepath: f, + timestamp: new Date(datetimePart), + }; + }) + .filter(f => { + return f.timestamp.getTime() < olderThan.getTime(); + }); + + logger.info(`Files found to be deleted: ${filesToBeDeleted.length}`); + for (const file of filesToBeDeleted) { + await this.remove(file.filepath); + } + } } diff --git a/packages/transcribe/src/services/queue/PgBossQueue.ts b/packages/transcribe/src/services/queue/PgBossQueue.ts index 98b1db17ef..b43c31aff1 100644 --- a/packages/transcribe/src/services/queue/PgBossQueue.ts +++ b/packages/transcribe/src/services/queue/PgBossQueue.ts @@ -1,6 +1,6 @@ import Logger from '@joplin/utils/Logger'; import PgBoss = require('pg-boss'); -import { BaseQueue, JobData, JobWithResult, QueueConfiguration } from '../../types'; +import { BaseQueue, JobData, JobWithData, JobWithResult, QueueConfiguration } from '../../types'; import { ErrorBadRequest } from '../../errors'; import { Day, Minute, Second } from '@joplin/utils/time'; @@ -58,7 +58,7 @@ export default class PgBossQueue implements BaseQueue { } public async fetch() { - const jobs = await this.boss.fetch(this.queue, { batchSize: 1 }); + const jobs = await this.boss.fetch(this.queue, { batchSize: 1, includeMetadata: true }); if (jobs.length === 0) return null; return jobs[0]; } @@ -80,6 +80,10 @@ export default class PgBossQueue implements BaseQueue { return result as JobWithResult; } + public hasJobFailedTooManyTimes(job: JobWithData): boolean { + return job.retryCount >= this.options.retryCount; + } + public async stop() { return this.boss.stop(); } diff --git a/packages/transcribe/src/services/queue/SqliteQueue.ts b/packages/transcribe/src/services/queue/SqliteQueue.ts index 04d887c99b..3c0ad33a49 100644 --- a/packages/transcribe/src/services/queue/SqliteQueue.ts +++ b/packages/transcribe/src/services/queue/SqliteQueue.ts @@ -1,4 +1,4 @@ -import { BaseQueue, JobData, JobStates, jobStateToEnum, QueueConfiguration, Result } from '../../types'; +import { BaseQueue, JobData, JobStates, jobStateToEnum, JobWithData, QueueConfiguration, Result } from '../../types'; import KnexConstructor, { Knex } from 'knex'; import Logger from '@joplin/utils/Logger'; import { formatMsToUTC, goBackInTime, Minute, msleep, Second } from '@joplin/utils/time'; @@ -101,7 +101,11 @@ export default class SqliteQueue implements BaseQueue { updated_on: this.sqlite.fn.now(), }).table('job').where({ id: job.id }); - return { id: job.id, data: JSON.parse(job.data) }; + return { + id: job.id, + retryCount: job.retry_count, + data: JSON.parse(job.data), + }; } public async fail(jobId: string, error: Error) { @@ -196,6 +200,10 @@ export default class SqliteQueue implements BaseQueue { } } + public hasJobFailedTooManyTimes(job: JobWithData): boolean { + return job.retryCount >= this.options.retryCount; + } + public async stop() { if (this.maintenanceIntervalRef) { clearInterval(this.maintenanceIntervalRef); diff --git a/packages/transcribe/src/types.ts b/packages/transcribe/src/types.ts index fbb3fa06ed..faad9ad8e3 100644 --- a/packages/transcribe/src/types.ts +++ b/packages/transcribe/src/types.ts @@ -47,10 +47,12 @@ export interface BaseQueue { complete(jobId: string, data: Result): Promise; getJobById(id: string): Promise; stop(): Promise; + hasJobFailedTooManyTimes(job: JobWithData): boolean; } export interface ContentStorage { store(filepath: string): Promise; + remove(filepath: string): Promise; } export type AppDefinedContext = { @@ -62,6 +64,7 @@ export type AppContext = Context & AppDefinedContext; export type JobWithData = { id: string; + retryCount: number; data: JobData; }; diff --git a/packages/transcribe/src/workers/JobProcessor.test.ts b/packages/transcribe/src/workers/JobProcessor.test.ts index d919eb4484..1bdbe76f77 100644 --- a/packages/transcribe/src/workers/JobProcessor.test.ts +++ b/packages/transcribe/src/workers/JobProcessor.test.ts @@ -5,6 +5,9 @@ import JobProcessor from './JobProcessor'; import HtrCli from '../core/HtrCli'; import { Minute, msleep, Second } from '@joplin/utils/time'; import { BaseQueue, OutputSuccess } from '../types'; +import FileStorage from '../services/FileStorage'; +import { join } from 'path'; +import { copy, exists } from 'fs-extra'; // since the model is not deterministic, it can, sometimes, output slightly difference responses const cleanUpResult = (result: string) => { @@ -33,13 +36,14 @@ describe('JobProcessor', () => { skipByDefault('should execute work on job in the queue', async () => { jest.useRealTimers(); - const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', 'images'), 1000); + const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000); await tw.init(); - const jobId = await queue.send({ filePath: 'htr_sample.png' }); + await copy(join('images', 'htr_sample.png'), join('images', 'htr_sample_copy.png')); + const jobId = await queue.send({ filePath: 'htr_sample_copy.png' }); - for (let i = 0; i < 20; i++) { - await msleep(30 * Second); + for (let i = 0; i < 36; i++) { + await msleep(10 * Second); const response = await queue.getJobById(jobId); if (response.state === 'active') continue; @@ -55,14 +59,15 @@ describe('JobProcessor', () => { skipByDefault('should execute work on job in the queue even if one fails', async () => { jest.useRealTimers(); - const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', 'images'), 1000); + const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000); await tw.init(); + await copy(join('images', 'htr_sample.png'), join('images', 'htr_sample_copy_2.png')); const jobId1 = await queue.send({ filePath: 'non-existing-file' }); - const jobId2 = await queue.send({ filePath: 'htr_sample.png' }); + const jobId2 = await queue.send({ filePath: 'htr_sample_copy_2.png' }); - for (let i = 0; i < 20; i++) { - await msleep(30 * Second); + for (let i = 0; i < 36; i++) { + await msleep(10 * Second); const response1 = await queue.getJobById(jobId1); if (response1.state === 'active') continue; expect(response1.state).toEqual('failed'); @@ -76,4 +81,56 @@ describe('JobProcessor', () => { return; } }, 6 * Minute); + + skipIfCI('should remove file sent to queue if job is completed', async () => { + jest.useRealTimers(); + const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), new FileStorage(), 1000); + await tw.init(); + const imagePath = join('images', 'htr_sample_copy_3.png'); + await copy(join('images', 'htr_sample.png'), imagePath); + + const jobId = await queue.send({ filePath: 'htr_sample_copy_3.png' }); + + for (let i = 0; i < 36; i++) { + await msleep(10 * Second); + const response = await queue.getJobById(jobId); + + if (response.state === 'active') continue; + + expect(response.id).toEqual(jobId); + expect(response.state).toEqual('completed'); + + const isFilePresent = await exists(imagePath); + expect(isFilePresent).toBe(false); + return; + } + + }, 6 * Minute); + + skipIfCI('should remove file sent to queue if job fails too many times', async () => { + jest.useRealTimers(); + const fileStorage = new FileStorage(); + const mockedFileStorageRemove = jest.fn(); + fileStorage.remove = mockedFileStorageRemove; + const tw = new JobProcessor(queue, new HtrCli('joplin/htr-cli:0.0.2', join(process.cwd(), 'images')), fileStorage, 1000); + await tw.init(); + + // file doesn't exist to force a fail, but the call to remove the file should still exist + const jobId = await queue.send({ filePath: 'non_existing_file.png' }); + + for (let i = 0; i < 36; i++) { + await msleep(10 * Second); + const response = await queue.getJobById(jobId); + + if (response.state === 'active') continue; + + expect(response.id).toEqual(jobId); + expect(response.state).toEqual('failed'); + + expect(mockedFileStorageRemove).toHaveBeenCalledWith('non_existing_file.png'); + return; + } + + }, 6 * Minute); + }); diff --git a/packages/transcribe/src/workers/JobProcessor.ts b/packages/transcribe/src/workers/JobProcessor.ts index 5b19e291a2..f3bd6200b8 100644 --- a/packages/transcribe/src/workers/JobProcessor.ts +++ b/packages/transcribe/src/workers/JobProcessor.ts @@ -1,5 +1,5 @@ import Logger from '@joplin/utils/Logger'; -import { BaseQueue, JobWithData, WorkHandler } from '../types'; +import { BaseQueue, ContentStorage, JobWithData, WorkHandler } from '../types'; const logger = Logger.create('JobProcessor'); @@ -10,10 +10,12 @@ export default class JobProcessor { private checkInteval = 5000; private currentJob: JobWithData | null = null; private workHandler: WorkHandler; + private contentStorage: ContentStorage; - public constructor(queue: BaseQueue, workHandler: WorkHandler, checkInterval?: number) { + public constructor(queue: BaseQueue, workHandler: WorkHandler, contentStorage: ContentStorage, checkInterval?: number) { this.queue = queue; this.workHandler = workHandler; + this.contentStorage = contentStorage; this.checkInteval = checkInterval ?? 5000; logger.info('Created JobProcessor'); } @@ -48,6 +50,7 @@ export default class JobProcessor { logger.info(`Processing job ${this.currentJob.id}`); const transcription = await this.workHandler.run(this.currentJob.data.filePath); await this.queue.complete(this.currentJob.id, { result: transcription }); + await this.contentStorage.remove(this.currentJob.data.filePath); } public async runOnce() { @@ -58,6 +61,9 @@ export default class JobProcessor { const e = error as Error; if (this.currentJob) { await this.queue.fail(this.currentJob.id, e); + if (this.queue.hasJobFailedTooManyTimes(this.currentJob)) { + await this.contentStorage.remove(this.currentJob.data.filePath); + } } } finally { this.currentJob = null;