mirror of
https://github.com/bpatrik/pigallery2.git
synced 2025-01-12 04:23:09 +02:00
implementing task rescheduling on task complete
This commit is contained in:
parent
09b626980e
commit
51801026cb
@ -473,7 +473,7 @@ export class AdminMWs {
|
|||||||
try {
|
try {
|
||||||
const id = req.params.id;
|
const id = req.params.id;
|
||||||
const taskConfig: any = req.body.config;
|
const taskConfig: any = req.body.config;
|
||||||
ObjectManagers.getInstance().TaskManager.start(id, taskConfig);
|
ObjectManagers.getInstance().TaskManager.run(id, taskConfig);
|
||||||
req.resultPipe = 'ok';
|
req.resultPipe = 'ok';
|
||||||
return next();
|
return next();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
@ -3,7 +3,7 @@ import {TaskDTO} from '../../../common/entities/task/TaskDTO';
|
|||||||
|
|
||||||
export interface ITaskManager {
|
export interface ITaskManager {
|
||||||
|
|
||||||
start(taskId: string, config: any): void;
|
run(taskId: string, config: any): Promise<void>;
|
||||||
|
|
||||||
stop(taskId: string): void;
|
stop(taskId: string): void;
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ export interface ITask<T> extends TaskDTO {
|
|||||||
Supported: boolean;
|
Supported: boolean;
|
||||||
Progress: TaskProgressDTO;
|
Progress: TaskProgressDTO;
|
||||||
|
|
||||||
start(config: T): void;
|
start(config: T): Promise<void>;
|
||||||
|
|
||||||
stop(): void;
|
stop(): void;
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ export abstract class Task<T = void> implements ITask<T> {
|
|||||||
protected progress: TaskProgressDTO = null;
|
protected progress: TaskProgressDTO = null;
|
||||||
protected running = false;
|
protected running = false;
|
||||||
protected config: T;
|
protected config: T;
|
||||||
|
protected prResolve: () => void;
|
||||||
|
|
||||||
|
|
||||||
public abstract get Supported(): boolean;
|
public abstract get Supported(): boolean;
|
||||||
@ -23,7 +24,7 @@ export abstract class Task<T = void> implements ITask<T> {
|
|||||||
return this.progress;
|
return this.progress;
|
||||||
}
|
}
|
||||||
|
|
||||||
public start(config: T): void {
|
public start(config: T): Promise<void> {
|
||||||
if (this.running === false && this.Supported) {
|
if (this.running === false && this.Supported) {
|
||||||
Logger.info('[Task]', 'Running task: ' + this.Name);
|
Logger.info('[Task]', 'Running task: ' + this.Name);
|
||||||
this.config = config;
|
this.config = config;
|
||||||
@ -39,8 +40,12 @@ export abstract class Task<T = void> implements ITask<T> {
|
|||||||
this.running = true;
|
this.running = true;
|
||||||
this.init().catch(console.error);
|
this.init().catch(console.error);
|
||||||
this.run();
|
this.run();
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
this.prResolve = resolve;
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
Logger.info('[Task]', 'Task already running: ' + this.Name);
|
Logger.info('[Task]', 'Task already running: ' + this.Name);
|
||||||
|
return Promise.reject();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,6 +62,7 @@ export abstract class Task<T = void> implements ITask<T> {
|
|||||||
private onFinish(): void {
|
private onFinish(): void {
|
||||||
this.progress = null;
|
this.progress = null;
|
||||||
Logger.info('[Task]', 'Task finished: ' + this.Name);
|
Logger.info('[Task]', 'Task finished: ' + this.Name);
|
||||||
|
this.prResolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
private run() {
|
private run() {
|
||||||
|
@ -10,7 +10,7 @@ const LOG_TAG = '[TaskManager]';
|
|||||||
|
|
||||||
export class TaskManager implements ITaskManager {
|
export class TaskManager implements ITaskManager {
|
||||||
|
|
||||||
protected timers: NodeJS.Timeout[] = [];
|
protected timers: { schedule: TaskScheduleDTO, timer: NodeJS.Timeout }[] = [];
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.runSchedules();
|
this.runSchedules();
|
||||||
@ -24,10 +24,10 @@ export class TaskManager implements ITaskManager {
|
|||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
start<T>(taskName: string, config: T): void {
|
async run<T>(taskName: string, config: T): Promise<void> {
|
||||||
const t = this.findTask(taskName);
|
const t = this.findTask(taskName);
|
||||||
if (t) {
|
if (t) {
|
||||||
t.start(config);
|
await t.start(config);
|
||||||
} else {
|
} else {
|
||||||
Logger.warn(LOG_TAG, 'cannot find task to start:' + taskName);
|
Logger.warn(LOG_TAG, 'cannot find task to start:' + taskName);
|
||||||
}
|
}
|
||||||
@ -50,30 +50,38 @@ export class TaskManager implements ITaskManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public stopSchedules(): void {
|
public stopSchedules(): void {
|
||||||
this.timers.forEach(timer => clearTimeout(timer));
|
this.timers.forEach(t => clearTimeout(t.timer));
|
||||||
this.timers = [];
|
this.timers = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
public runSchedules(): void {
|
public runSchedules(): void {
|
||||||
this.stopSchedules();
|
this.stopSchedules();
|
||||||
Logger.info(LOG_TAG, 'Running task schedules');
|
Logger.info(LOG_TAG, 'Running task schedules');
|
||||||
Config.Server.tasks.scheduled.forEach(schedule => {
|
Config.Server.tasks.scheduled.forEach(s => this.runSchedule(s));
|
||||||
const nextDate = this.getDateFromSchedule(new Date(), schedule);
|
}
|
||||||
if (nextDate && nextDate.getTime() > Date.now()) {
|
|
||||||
Logger.debug(LOG_TAG, 'running schedule: ' + schedule.taskName +
|
|
||||||
' at ' + nextDate.toLocaleString(undefined, {hour12: false}));
|
|
||||||
|
|
||||||
const timer: NodeJS.Timeout = setTimeout(() => {
|
protected getNextDayOfTheWeek(refDate: Date, dayOfWeek: number) {
|
||||||
this.start(schedule.taskName, schedule.config);
|
const date = new Date(refDate);
|
||||||
this.timers = this.timers.filter(t => t !== timer);
|
date.setDate(refDate.getDate() + (dayOfWeek + 1 + 7 - refDate.getDay()) % 7);
|
||||||
}, nextDate.getTime() - Date.now());
|
if (date.getDay() === refDate.getDay()) {
|
||||||
this.timers.push(timer);
|
return new Date(refDate);
|
||||||
|
}
|
||||||
|
date.setHours(0, 0, 0, 0);
|
||||||
|
return date;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
protected nextValidDate(date: Date, h: number, m: number, dayDiff: number): Date {
|
||||||
Logger.debug(LOG_TAG, 'skipping schedule:' + schedule.taskName);
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
date.setSeconds(0);
|
||||||
|
if (date.getHours() < h || (date.getHours() === h && date.getMinutes() < m)) {
|
||||||
|
date.setHours(h);
|
||||||
|
date.setMinutes(m);
|
||||||
|
} else {
|
||||||
|
date.setTime(date.getTime() + dayDiff);
|
||||||
|
date.setHours(h);
|
||||||
|
date.setMinutes(m);
|
||||||
|
}
|
||||||
|
return date;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected getDateFromSchedule(refDate: Date, schedule: TaskScheduleDTO): Date {
|
protected getDateFromSchedule(refDate: Date, schedule: TaskScheduleDTO): Date {
|
||||||
@ -82,40 +90,18 @@ export class TaskManager implements ITaskManager {
|
|||||||
return new Date(schedule.trigger.time);
|
return new Date(schedule.trigger.time);
|
||||||
|
|
||||||
case TaskTriggerType.periodic:
|
case TaskTriggerType.periodic:
|
||||||
const nextValidHM = (date: Date, h: number, m: number, dayDiff: number): Date => {
|
|
||||||
|
|
||||||
if (date.getHours() < h || (date.getHours() === h && date.getMinutes() <= m)) {
|
|
||||||
date.setHours(h);
|
|
||||||
date.setMinutes(m);
|
|
||||||
} else {
|
|
||||||
date.setTime(date.getTime() + dayDiff);
|
|
||||||
date.setHours(h);
|
|
||||||
date.setMinutes(m);
|
|
||||||
}
|
|
||||||
return date;
|
|
||||||
};
|
|
||||||
|
|
||||||
const getNextDayOfTheWeek = (dayOfWeek: number, h: number, m: number): Date => {
|
|
||||||
const date = new Date(refDate);
|
|
||||||
date.setDate(refDate.getDate() + (dayOfWeek + 1 + 7 - refDate.getDay()) % 7);
|
|
||||||
if (date.getDay() === refDate.getDay()) {
|
|
||||||
return new Date(refDate);
|
|
||||||
}
|
|
||||||
date.setHours(0, 0, 0, 0);
|
|
||||||
return date;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
const hour = Math.floor(schedule.trigger.atTime / 1000 / (60 * 60));
|
const hour = Math.floor(schedule.trigger.atTime / 1000 / (60 * 60));
|
||||||
const minute = (schedule.trigger.atTime / 1000 / 60) % 60;
|
const minute = (schedule.trigger.atTime / 1000 / 60) % 60;
|
||||||
|
|
||||||
if (schedule.trigger.periodicity <= 6) { // Between Monday and Sunday
|
if (schedule.trigger.periodicity <= 6) { // Between Monday and Sunday
|
||||||
const nextRunDate = getNextDayOfTheWeek(schedule.trigger.periodicity, hour, minute);
|
const nextRunDate = this.getNextDayOfTheWeek(refDate, schedule.trigger.periodicity);
|
||||||
return nextValidHM(nextRunDate, hour, minute, 7 * 24 * 60 * 60 * 1000);
|
return this.nextValidDate(nextRunDate, hour, minute, 7 * 24 * 60 * 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
// every day
|
// every day
|
||||||
return nextValidHM(new Date(refDate), hour, minute, 24 * 60 * 60 * 1000);
|
return this.nextValidDate(new Date(refDate), hour, minute, 24 * 60 * 60 * 1000);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -124,4 +110,22 @@ export class TaskManager implements ITaskManager {
|
|||||||
return this.getAvailableTasks().find(t => t.Name === taskName);
|
return this.getAvailableTasks().find(t => t.Name === taskName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private runSchedule(schedule: TaskScheduleDTO) {
|
||||||
|
const nextDate = this.getDateFromSchedule(new Date(), schedule);
|
||||||
|
if (nextDate && nextDate.getTime() > Date.now()) {
|
||||||
|
Logger.debug(LOG_TAG, 'running schedule: ' + schedule.taskName +
|
||||||
|
' at ' + nextDate.toLocaleString(undefined, {hour12: false}));
|
||||||
|
|
||||||
|
const timer: NodeJS.Timeout = setTimeout(async () => {
|
||||||
|
this.timers = this.timers.filter(t => t.timer !== timer);
|
||||||
|
await this.run(schedule.taskName, schedule.config);
|
||||||
|
this.runSchedule(schedule);
|
||||||
|
}, nextDate.getTime() - Date.now());
|
||||||
|
this.timers.push({schedule: schedule, timer: timer});
|
||||||
|
|
||||||
|
} else {
|
||||||
|
Logger.debug(LOG_TAG, 'skipping schedule:' + schedule.taskName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ describe('TaskManager', () => {
|
|||||||
it('should get date from schedule', async () => {
|
it('should get date from schedule', async () => {
|
||||||
const tm = new TaskManagerSpec();
|
const tm = new TaskManagerSpec();
|
||||||
|
|
||||||
const refDate = new Date(2019, 7, 18, 5, 10); // its a sunday
|
const refDate = new Date(2019, 7, 18, 5, 10, 10, 0); // its a sunday
|
||||||
|
|
||||||
|
|
||||||
expect(tm.getDateFromSchedule(refDate, <any>{
|
expect(tm.getDateFromSchedule(refDate, <any>{
|
||||||
@ -22,7 +22,7 @@ describe('TaskManager', () => {
|
|||||||
type: TaskTriggerType.scheduled,
|
type: TaskTriggerType.scheduled,
|
||||||
time: (new Date(2019, 7, 18, 5, 10)).getTime()
|
time: (new Date(2019, 7, 18, 5, 10)).getTime()
|
||||||
}
|
}
|
||||||
})).to.be.deep.equal((new Date(2019, 7, 18, 5, 10)));
|
})).to.be.deep.equal((new Date(2019, 7, 18, 5, 10, 0)));
|
||||||
|
|
||||||
|
|
||||||
for (let dayOfWeek = 0; dayOfWeek < 7; ++dayOfWeek) {
|
for (let dayOfWeek = 0; dayOfWeek < 7; ++dayOfWeek) {
|
||||||
@ -36,7 +36,7 @@ describe('TaskManager', () => {
|
|||||||
atTime: (h * 60 + m) * 60 * 1000,
|
atTime: (h * 60 + m) * 60 * 1000,
|
||||||
periodicity: dayOfWeek
|
periodicity: dayOfWeek
|
||||||
}
|
}
|
||||||
})).to.be.deep.equal((new Date(2019, 7, nextDay, h, m)), 'for day: ' + dayOfWeek);
|
})).to.be.deep.equal((new Date(2019, 7, nextDay, h, m, 0)), 'for day: ' + dayOfWeek);
|
||||||
|
|
||||||
h = 2;
|
h = 2;
|
||||||
m = 5;
|
m = 5;
|
||||||
@ -47,7 +47,18 @@ describe('TaskManager', () => {
|
|||||||
atTime: (h * 60 + m) * 60 * 1000,
|
atTime: (h * 60 + m) * 60 * 1000,
|
||||||
periodicity: dayOfWeek
|
periodicity: dayOfWeek
|
||||||
}
|
}
|
||||||
})).to.be.deep.equal((new Date(2019, 7, nextDay, h, m)), 'for day: ' + dayOfWeek);
|
})).to.be.deep.equal((new Date(2019, 7, nextDay, h, m, 0)), 'for day: ' + dayOfWeek);
|
||||||
|
|
||||||
|
h = 5;
|
||||||
|
m = 10;
|
||||||
|
nextDay = 18 + dayOfWeek + 1;
|
||||||
|
expect(tm.getDateFromSchedule(refDate, <any>{
|
||||||
|
trigger: {
|
||||||
|
type: TaskTriggerType.periodic,
|
||||||
|
atTime: (h * 60 + m) * 60 * 1000,
|
||||||
|
periodicity: dayOfWeek
|
||||||
|
}
|
||||||
|
})).to.be.deep.equal((new Date(2019, 7, nextDay, h, m, 0)), 'for day: ' + dayOfWeek);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -59,7 +70,7 @@ describe('TaskManager', () => {
|
|||||||
atTime: (h * 60 + m) * 60 * 1000,
|
atTime: (h * 60 + m) * 60 * 1000,
|
||||||
periodicity: 7
|
periodicity: 7
|
||||||
}
|
}
|
||||||
})).to.be.deep.equal((new Date(2019, 7, 18, h, m)));
|
})).to.be.deep.equal((new Date(2019, 7, 18, h, m, 0)));
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
const h = 2;
|
const h = 2;
|
||||||
@ -70,7 +81,7 @@ describe('TaskManager', () => {
|
|||||||
atTime: (h * 60 + m) * 60 * 1000,
|
atTime: (h * 60 + m) * 60 * 1000,
|
||||||
periodicity: 7
|
periodicity: 7
|
||||||
}
|
}
|
||||||
})).to.be.deep.equal((new Date(2019, 7, 19, h, m)));
|
})).to.be.deep.equal((new Date(2019, 7, 19, h, m, 0)));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user