mirror of
https://github.com/bpatrik/pigallery2.git
synced 2025-03-27 21:49:01 +02:00
improving event loop handling
This commit is contained in:
parent
43b970c992
commit
d5eea29f8d
19
src/backend/model/EventLoopHandler.ts
Normal file
19
src/backend/model/EventLoopHandler.ts
Normal file
@ -0,0 +1,19 @@
|
||||
export class EventLoopHandler {
|
||||
|
||||
private eventCounter = 0;
|
||||
|
||||
constructor(private readonly MAX_LOOP = 10) {
|
||||
}
|
||||
|
||||
/*
|
||||
* setImmediate is slow, but does the right thing
|
||||
* next tick is super fast, but does not help much with the event loop as it does not allow a full event loop cycle
|
||||
* https://medium.com/dkatalis/eventloop-in-nodejs-settimeout-setimmediate-vs-process-nexttick-37c852c67acb
|
||||
* */
|
||||
step(fn: () => Promise<void> | void) {
|
||||
|
||||
this.eventCounter = this.eventCounter % 10;
|
||||
const eventFN = this.eventCounter++ === 1 ? setImmediate : process.nextTick;
|
||||
eventFN(fn);
|
||||
}
|
||||
}
|
@ -35,21 +35,21 @@ export abstract class Job<T extends Record<string, unknown> = Record<string, unk
|
||||
|
||||
public get InProgress(): boolean {
|
||||
return (
|
||||
this.Progress !== null &&
|
||||
(this.Progress.State === JobProgressStates.running ||
|
||||
this.Progress.State === JobProgressStates.cancelling)
|
||||
this.Progress !== null &&
|
||||
(this.Progress.State === JobProgressStates.running ||
|
||||
this.Progress.State === JobProgressStates.cancelling)
|
||||
);
|
||||
}
|
||||
|
||||
public start(
|
||||
config: T,
|
||||
soloRun = false,
|
||||
allowParallelRun = false
|
||||
config: T,
|
||||
soloRun = false,
|
||||
allowParallelRun = false
|
||||
): Promise<void> {
|
||||
if (this.InProgress === false && this.Supported === true) {
|
||||
Logger.info(
|
||||
LOG_TAG,
|
||||
'Running job ' + (soloRun === true ? 'solo' : '') + ': ' + this.Name
|
||||
LOG_TAG,
|
||||
'Running job ' + (soloRun === true ? 'solo' : '') + ': ' + this.Name
|
||||
);
|
||||
this.soloRun = soloRun;
|
||||
this.allowParallelRun = allowParallelRun;
|
||||
@ -63,8 +63,8 @@ export abstract class Job<T extends Record<string, unknown> = Record<string, unk
|
||||
}
|
||||
}
|
||||
this.progress = new JobProgress(
|
||||
this.Name,
|
||||
JobDTOUtils.getHashName(this.Name, this.config)
|
||||
this.Name,
|
||||
JobDTOUtils.getHashName(this.Name, this.config)
|
||||
);
|
||||
this.progress.OnChange = this.jobListener.onProgressUpdate;
|
||||
const pr = new Promise<void>((resolve): void => {
|
||||
@ -79,11 +79,11 @@ export abstract class Job<T extends Record<string, unknown> = Record<string, unk
|
||||
return pr;
|
||||
} else {
|
||||
Logger.info(
|
||||
LOG_TAG,
|
||||
'Job already running or not supported: ' + this.Name
|
||||
LOG_TAG,
|
||||
'Job already running or not supported: ' + this.Name
|
||||
);
|
||||
return Promise.reject(
|
||||
'Job already running or not supported: ' + this.Name
|
||||
'Job already running or not supported: ' + this.Name
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -134,11 +134,12 @@ export abstract class Job<T extends Record<string, unknown> = Record<string, unk
|
||||
}
|
||||
|
||||
private run(): void {
|
||||
// we call setImmediate later.
|
||||
process.nextTick(async (): Promise<void> => {
|
||||
try {
|
||||
if (
|
||||
this.Progress == null ||
|
||||
this.Progress.State !== JobProgressStates.running
|
||||
this.Progress == null ||
|
||||
this.Progress.State !== JobProgressStates.running
|
||||
) {
|
||||
this.onFinish();
|
||||
return;
|
||||
|
@ -1,4 +1,5 @@
|
||||
import {TaskQue} from './TaskQue';
|
||||
import {EventLoopHandler} from '../EventLoopHandler';
|
||||
|
||||
export interface ITaskExecuter<I, O> {
|
||||
execute(input: I): Promise<O>;
|
||||
@ -7,6 +8,7 @@ export interface ITaskExecuter<I, O> {
|
||||
export class TaskExecuter<I, O> implements ITaskExecuter<I, O> {
|
||||
private taskQue = new TaskQue<I, O>();
|
||||
private taskInProgress = 0;
|
||||
private readonly el = new EventLoopHandler();
|
||||
private run = async () => {
|
||||
if (this.taskQue.isEmpty() || this.taskInProgress >= this.size) {
|
||||
return;
|
||||
@ -20,7 +22,7 @@ export class TaskExecuter<I, O> implements ITaskExecuter<I, O> {
|
||||
}
|
||||
this.taskQue.ready(task);
|
||||
this.taskInProgress--;
|
||||
process.nextTick(this.run);
|
||||
this.el.step(this.run);
|
||||
};
|
||||
|
||||
constructor(private size: number, private worker: (input: I) => Promise<O>) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user