@@ -3,14 +3,15 @@ import Events from "../events";
3
3
import { JobStatus } from "../../components/JobStatus" ;
4
4
import Worker from "../workers" ;
5
5
import Config from "../../components/config" ;
6
- import { nanoid } from "nanoid" ;
7
6
import Article from "../../components/articles" ;
8
7
import { ConfigOptions } from "../../middleware/ConfigOptions" ;
9
8
import Extensions from "../extensions" ;
10
9
import Source from "../../components/source" ;
11
10
import Scheduler from "../scheduler" ;
12
11
import * as ServerIO from "socket.io"
13
12
import * as ClientIO from "socket.io-client" ;
13
+ import * as http from "http" ;
14
+ import * as https from "https" ;
14
15
15
16
16
17
export default class Grid {
@@ -19,48 +20,71 @@ export default class Grid {
19
20
private declare readonly isMain : boolean
20
21
private declare readonly workersIds : string [ ] ;
21
22
private declare workersClients : { workersIds : string [ ] , socketId : string } [ ] ;
22
- private declare readonly encryptionKey : string
23
23
24
- private declare server : ServerIO . Server ;
25
- private declare client : ClientIO . Socket ;
24
+ private declare readonly http_s_server : any ;
25
+ private declare readonly server : ServerIO . Server ;
26
+ private declare readonly client : ClientIO . Socket ;
26
27
27
28
private constructor ( ) {
28
- this . isMain = Config . getOption ( ConfigOptions . SAFFRON_MODE ) === 'main'
29
- this . workersIds = [ ]
30
- this . workersClients = [ ]
31
- this . encryptionKey = nanoid ( 256 )
29
+ this . isMain = Config . getOption ( ConfigOptions . SAFFRON_MODE ) === 'main' ;
30
+ this . workersIds = [ ] ;
31
+ this . workersClients = [ ] ;
32
32
33
- if ( this . isMain )
34
- this . server = new ServerIO . Server ( {
35
- // TODO
33
+ if ( Config . getOption ( ConfigOptions . GRID_DISTRIBUTED ) ) {
34
+ if ( typeof Config . getOption ( ConfigOptions . GRID_AUTH ) !== 'string' )
35
+ throw new Error ( 'InvalidAuthException The grid authToken must be type string on distributed: true' ) ;
36
+ }
37
+
38
+ if ( this . isMain ) {
39
+ if ( Config . getOption ( ConfigOptions . GRID_USE_HTTP ) )
40
+ this . http_s_server = https . createServer ( {
41
+ key : Config . getOption ( ConfigOptions . GRID_HTTPS_KEY ) ,
42
+ cert : Config . getOption ( ConfigOptions . GRID_HTTPS_CERT )
43
+ } ) ;
44
+ else
45
+ this . http_s_server = http . createServer ( ) ;
46
+
47
+ this . server = new ServerIO . Server ( this . http_s_server , {
48
+ serveClient : false , // No reason to server bundle client files
49
+ connectTimeout : 30 * 1000 ,
36
50
} ) ;
37
- else
38
- this . client = ClientIO . io ( {
39
- // TODO
51
+ } else {
52
+ let url = Config . getOption ( ConfigOptions . GRID_USE_HTTP ) ? 'http://' : 'https://' ;
53
+ url += Config . getOption ( ConfigOptions . GRID_SERVER_ADDRESS ) ;
54
+
55
+ const serverPort = Config . getOption ( ConfigOptions . GRID_SERVER_ADDRESS ) ;
56
+ if ( serverPort != null ) url += `:${ serverPort } ` ;
57
+
58
+ this . client = ClientIO . io ( url , {
59
+ reconnectionDelay : 5 * 1000 ,
60
+ timeout : 15 * 1000 ,
61
+ autoConnect : false ,
62
+ auth : {
63
+ token : Config . getOption ( ConfigOptions . GRID_AUTH )
64
+ }
40
65
} ) ;
66
+ }
41
67
}
42
68
43
69
/**
44
70
* Returns an instance of Grid
45
71
*/
46
72
static getInstance ( ) : Grid {
47
73
if ( this . instance == null )
48
- this . instance = new Grid ( )
74
+ this . instance = new Grid ( ) ;
49
75
50
- return this . instance
76
+ return this . instance ;
51
77
}
52
78
53
79
emit ( eventName : string , ...args : any [ ] ) : void {
54
- if ( ! Config . getOption ( ConfigOptions . GRID_DISTRIBUTED ) )
80
+ if ( ! Config . getOption ( ConfigOptions . GRID_DISTRIBUTED ) )
55
81
return ;
56
82
57
83
if ( this . isMain ) {
58
- if ( [ 'scheduler.job.push' ] . includes ( eventName ) )
84
+ if ( [ 'scheduler.job.push' ] . includes ( eventName ) )
59
85
this . server . emit ( eventName , ...args ) ;
60
- }
61
- else {
62
- if ( [ 'workers.job.finished' , 'workers.job.failed' ,
63
- 'grid.worker.announced' , 'grid.worker.destroyed' ] . includes ( eventName ) )
86
+ } else {
87
+ if ( [ 'workers.job.finished' , 'workers.job.failed' , 'grid.worker.announced' , 'grid.worker.destroyed' ] . includes ( eventName ) )
64
88
this . client . emit ( eventName , ...args ) ;
65
89
}
66
90
}
@@ -72,6 +96,12 @@ export default class Grid {
72
96
if ( this . isMain ) {
73
97
if ( Config . getOption ( ConfigOptions . GRID_DISTRIBUTED ) ) {
74
98
this . server . on ( "connection" , socket => {
99
+ const clientAuthToken = socket . handshake . auth . token ;
100
+ if ( ! clientAuthToken || clientAuthToken !== Config . getOption ( ConfigOptions . GRID_AUTH ) ) {
101
+ socket . disconnect ( ) ;
102
+ return ;
103
+ }
104
+
75
105
Events . emit ( 'grid.node.connected' , socket ) ;
76
106
77
107
socket . on ( "disconnect" , ( ) => {
@@ -92,11 +122,12 @@ export default class Grid {
92
122
93
123
socket . on ( "grid.worker.destroyed" , workerId => {
94
124
let index = this . workersIds . findIndex ( id => id === workerId ) ;
95
- if ( index !== - 1 ) this . workersIds . splice ( index , 1 ) ;
125
+ if ( index !== - 1 )
126
+ this . workersIds . splice ( index , 1 ) ;
96
127
} ) ;
97
128
} ) ;
98
129
99
- // await this.server .listen(); // TODO - listen with express
130
+ this . http_s_server . listen ( Config . getOption ( ConfigOptions . GRID_SERVER_PORT ) ) ;
100
131
}
101
132
} else if ( Config . getOption ( ConfigOptions . WORKER_NODES ) > 0 ) {
102
133
this . client . on ( 'connect' , ( ) => {
@@ -109,7 +140,7 @@ export default class Grid {
109
140
Events . emit ( "scheduler.job.push" , data ) ;
110
141
} ) ;
111
142
112
- await this . client . connect ( )
143
+ await this . client . connect ( ) ;
113
144
}
114
145
}
115
146
@@ -173,7 +204,7 @@ export default class Grid {
173
204
async finishedJob ( job : Job ) : Promise < void > {
174
205
job . status = JobStatus . FINISHED ;
175
206
if ( this . isMain )
176
- Events . emit ( 'workers.job.finished' , job )
207
+ Events . emit ( 'workers.job.finished' , job . id ) ;
177
208
else
178
209
await this . client . emit ( 'workers.job.finished' , job . id ) ;
179
210
}
@@ -184,11 +215,11 @@ export default class Grid {
184
215
* @param job
185
216
*/
186
217
async failedJob ( job : Job ) : Promise < void > {
187
- job . status = JobStatus . FAILED
218
+ job . status = JobStatus . FAILED ;
188
219
if ( this . isMain )
189
- Events . emit ( "workers.job.failed" , job )
220
+ Events . emit ( "workers.job.failed" , job . id ) ;
190
221
else
191
- await this . client . emit ( 'workers.job.failed' , { id : job . id } )
222
+ await this . client . emit ( 'workers.job.failed' , job . id ) ;
192
223
}
193
224
194
225
/**
0 commit comments