1
- import Config from "../../components/config" ;
1
+ import Config , { ConfigOptions } from "../../components/config" ;
2
2
import Events from "../events" ;
3
3
import Job from "../../components/job" ;
4
4
import Source from "../../components/source" ;
5
5
import Grid from "../grid/index" ;
6
6
import { JobStatus } from "../../components/JobStatus" ;
7
7
import Worker from "../workers" ;
8
- import { ConfigOptions } from "../../middleware/ConfigOptions" ;
9
8
import glob from "glob" ;
10
9
import * as path from "path" ;
11
10
12
- const pathCwd = process . cwd ( ) ;
13
-
14
11
export default class Scheduler {
15
12
16
13
private static instance : Scheduler | null = null ;
17
- static getInstance ( ) : Scheduler {
18
- if ( this . instance === null )
19
- this . instance = new Scheduler ( ) ;
20
- return this . instance ! ! ;
21
- }
22
-
23
14
private declare isRunning : boolean ;
24
15
private declare jobsStorage : Job [ ] ;
25
16
26
- constructor ( ) {
17
+ private constructor ( ) {
27
18
Events . on ( "start" , ( keepPreviousSession ) => this . start ( keepPreviousSession ) ) ;
28
19
Events . on ( "stop" , ( ) => this . stop ( ) ) ;
29
20
this . jobsStorage = [ ] ;
30
21
}
31
22
23
+ static getInstance ( ) : Scheduler {
24
+ if ( this . instance === null )
25
+ this . instance = new Scheduler ( ) ;
26
+ return this . instance ! ! ;
27
+ }
28
+
32
29
/**
33
30
* Issue a new job for a specific source
34
31
* @param source The source
@@ -51,7 +48,7 @@ export default class Scheduler {
51
48
const checkInterval = Config . getOption ( ConfigOptions . SCHEDULER_CHECKS_INT ) ;
52
49
53
50
this . isRunning = true ;
54
- if ( ! keepPreviousSession ) {
51
+ if ( ! keepPreviousSession ) {
55
52
const sources = await this . resetSources ( ) ;
56
53
this . resetJobs ( sources ) ;
57
54
}
@@ -96,10 +93,11 @@ export default class Scheduler {
96
93
// Pending jobs
97
94
case JobStatus . PENDING :
98
95
if ( job . untilRetry <= 0 ) {
99
- // If the worker did not complete the job after 5 times elect new worker
96
+ // If the worker did not change the job status after 5 times (totally: 5 * checkInterval ms),
97
+ // expect it to have crashed, so we elect a new worker to take its place.
100
98
if ( job . emitAttempts > 5 ) {
101
99
let oldWorker = job . worker . id ;
102
- Grid . getInstance ( ) . fireWorker ( job . worker . id ) ;
100
+ Grid . getInstance ( ) . fireWorker ( job . source . id , oldWorker ) ;
103
101
104
102
job . worker . id = Worker . electWorker ( job . worker . id ) ;
105
103
Events . emit ( "scheduler.job.worker.replace" , oldWorker , job ) ;
@@ -175,7 +173,7 @@ export default class Scheduler {
175
173
176
174
changeJobStatus ( id : string , status : JobStatus ) {
177
175
let job = this . jobsStorage . find ( ( obj : Job ) => obj . id === id ) ;
178
- if ( job ) job . status = status ;
176
+ if ( job ) job . status = status ;
179
177
}
180
178
181
179
/**
@@ -184,7 +182,7 @@ export default class Scheduler {
184
182
private scanSourceFiles ( ) : Promise < void > {
185
183
return new Promise ( ( resolve , reject ) => {
186
184
let sourcesPath = Config . getOption ( ConfigOptions . SOURCES_PATH ) ;
187
- glob ( `${ path . join ( pathCwd , sourcesPath ) } /**` , { } , ( error : any , files : string [ ] ) => {
185
+ glob ( `${ path . join ( process . cwd ( ) , sourcesPath ) } /**` , { } , ( error : any , files : string [ ] ) => {
188
186
if ( error ) {
189
187
Events . emit ( 'scheduler.path.error' , error ) ;
190
188
return reject ( error ) ;
0 commit comments