Skip to content

Commit 7ba2729

Browse files
committedFeb 20, 2024
feat(worker): add support for naming workers
1 parent b1432ab commit 7ba2729

11 files changed

+131
-26
lines changed
 

‎src/classes/job.ts

+9
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ export class Job<
147147
*/
148148
token?: string;
149149

150+
/**
151+
* The worker name that is processing or processed this job.
152+
*/
153+
processedBy?: string;
154+
150155
protected toKey: (type: string) => string;
151156

152157
protected discarded: boolean;
@@ -338,6 +343,10 @@ export class Job<
338343
job.parent = JSON.parse(json.parent);
339344
}
340345

346+
if (json.pb) {
347+
job.processedBy = json.pb;
348+
}
349+
341350
return job;
342351
}
343352

‎src/classes/queue-events.ts

+1
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ export class QueueEvents extends QueueBase {
236236
this.running = true;
237237
const client = await this.client;
238238

239+
// Planed for deprecation as it has no really a use case
239240
try {
240241
await client.client('SETNAME', this.clientName(QUEUE_EVENT_SUFFIX));
241242
} catch (err) {

‎src/classes/queue-getters.ts

+19-11
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@
33

44
import { QueueBase } from './queue-base';
55
import { Job } from './job';
6-
import {
7-
clientCommandMessageReg,
8-
QUEUE_EVENT_SUFFIX,
9-
WORKER_SUFFIX,
10-
} from '../utils';
6+
import { clientCommandMessageReg, QUEUE_EVENT_SUFFIX } from '../utils';
117
import { JobState, JobType } from '../types';
128
import { JobJsonRaw, Metrics } from '../interfaces';
139

@@ -432,15 +428,15 @@ export class QueueGetters<
432428
};
433429
}
434430

435-
private async baseGetClients(suffix: string): Promise<
431+
private async baseGetClients(matcher: (name: string) => boolean): Promise<
436432
{
437433
[index: string]: string;
438434
}[]
439435
> {
440436
const client = await this.client;
441437
const clients = (await client.client('LIST')) as string;
442438
try {
443-
const list = this.parseClientList(clients, suffix);
439+
const list = this.parseClientList(clients, matcher);
444440
return list;
445441
} catch (err) {
446442
if (!clientCommandMessageReg.test((<Error>err).message)) {
@@ -463,21 +459,32 @@ export class QueueGetters<
463459
[index: string]: string;
464460
}[]
465461
> {
466-
return this.baseGetClients(WORKER_SUFFIX);
462+
const unnamedWorkerClientName = `${this.clientName()}`;
463+
const namedWorkerClientName = `${this.clientName()}:w:`;
464+
465+
const matcher = (name: string) =>
466+
name &&
467+
(name === unnamedWorkerClientName ||
468+
name.startsWith(namedWorkerClientName));
469+
470+
return this.baseGetClients(matcher);
467471
}
468472

469473
/**
470474
* Get queue events list related to the queue.
471475
* Note: GCP does not support SETNAME, so this call will not work
472476
*
477+
* @deprecated do not use this method, it will be removed in the future.
478+
*
473479
* @returns - Returns an array with queue events info.
474480
*/
475481
async getQueueEvents(): Promise<
476482
{
477483
[index: string]: string;
478484
}[]
479485
> {
480-
return this.baseGetClients(QUEUE_EVENT_SUFFIX);
486+
const clientName = `${this.clientName()}${QUEUE_EVENT_SUFFIX}`;
487+
return this.baseGetClients((name: string) => name === clientName);
481488
}
482489

483490
/**
@@ -531,7 +538,7 @@ export class QueueGetters<
531538
};
532539
}
533540

534-
private parseClientList(list: string, suffix = '') {
541+
private parseClientList(list: string, matcher: (name: string) => boolean) {
535542
const lines = list.split('\n');
536543
const clients: { [index: string]: string }[] = [];
537544

@@ -545,8 +552,9 @@ export class QueueGetters<
545552
client[key] = value;
546553
});
547554
const name = client['name'];
548-
if (name && name === `${this.clientName()}${suffix ? `${suffix}` : ''}`) {
555+
if (matcher(name)) {
549556
client['name'] = this.name;
557+
client['rawname'] = name;
550558
clients.push(client);
551559
}
552560
});

‎src/classes/scripts.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ export class Scripts {
943943
}
944944
}
945945

946-
async moveToActive(client: RedisClient, token: string) {
946+
async moveToActive(client: RedisClient, token: string, name?: string) {
947947
const opts = this.queue.opts as WorkerOptions;
948948

949949
const queueKeys = this.queue.keys;
@@ -968,6 +968,7 @@ export class Scripts {
968968
token,
969969
lockDuration: opts.lockDuration,
970970
limiter: opts.limiter,
971+
name,
971972
}),
972973
];
973974

‎src/classes/worker.ts

+18-12
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import {
2121
DELAY_TIME_1,
2222
isNotConnectionError,
2323
isRedisInstance,
24-
WORKER_SUFFIX,
2524
} from '../utils';
2625
import { QueueBase } from './queue-base';
2726
import { Repeat } from './repeat';
@@ -256,16 +255,21 @@ export class Worker<
256255
}
257256
}
258257

259-
const mainFile = this.opts.useWorkerThreads
260-
? 'main-worker.js'
261-
: 'main.js';
262-
let mainFilePath = path.join(
263-
path.dirname(module.filename),
264-
`${mainFile}`,
265-
);
258+
// Separate paths so that bundling tools can resolve dependencies easier
259+
const dirname = path.dirname(module.filename || __filename);
260+
const workerThreadsMainFile = path.join(dirname, 'main-worker.js');
261+
const spawnProcessMainFile = path.join(dirname, 'main.js');
262+
263+
let mainFilePath = this.opts.useWorkerThreads
264+
? workerThreadsMainFile
265+
: spawnProcessMainFile;
266+
266267
try {
267268
fs.statSync(mainFilePath); // would throw if file not exists
268269
} catch (_) {
270+
const mainFile = this.opts.useWorkerThreads
271+
? 'main-worker.js'
272+
: 'main.js';
269273
mainFilePath = path.join(
270274
process.cwd(),
271275
`dist/cjs/classes/${mainFile}`,
@@ -289,7 +293,8 @@ export class Worker<
289293
}
290294
}
291295

292-
const connectionName = this.clientName(WORKER_SUFFIX);
296+
const connectionName =
297+
this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : '');
293298
this.blockingConnection = new RedisConnection(
294299
isRedisInstance(opts.connection)
295300
? (<Redis>opts.connection).duplicate({ connectionName })
@@ -530,7 +535,7 @@ export class Worker<
530535
this.blockUntil = await this.waiting;
531536

532537
if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) {
533-
return this.moveToActive(client, token);
538+
return this.moveToActive(client, token, this.opts.name);
534539
}
535540
} catch (err) {
536541
// Swallow error if locally paused or closing since we did force a disconnection
@@ -549,7 +554,7 @@ export class Worker<
549554
this.abortDelayController = new AbortController();
550555
await this.delay(this.limitUntil, this.abortDelayController);
551556
}
552-
return this.moveToActive(client, token);
557+
return this.moveToActive(client, token, this.opts.name);
553558
}
554559
}
555560

@@ -572,9 +577,10 @@ export class Worker<
572577
protected async moveToActive(
573578
client: RedisClient,
574579
token: string,
580+
name?: string,
575581
): Promise<Job<DataType, ResultType, NameType>> {
576582
const [jobData, id, limitUntil, delayUntil] =
577-
await this.scripts.moveToActive(client, token);
583+
await this.scripts.moveToActive(client, token, name);
578584
this.updateDelays(limitUntil, delayUntil);
579585

580586
return this.nextJobFromJobData(jobData, id, token);

‎src/commands/includes/prepareJobForProcessing.lua

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey
2929
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
3030
end
3131

32+
if opts['name'] then
33+
-- Set "processedBy" field to the worker name
34+
rcall("HSET", jobKey, "pb", opts['name'])
35+
end
36+
3237
rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
3338
rcall("HSET", jobKey, "processedOn", processedOn)
3439
rcall("HINCRBY", jobKey, "ats", 1)

‎src/interfaces/job-json.ts

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export interface JobJson {
1818
parent?: ParentKeys;
1919
parentKey?: string;
2020
repeatJobKey?: string;
21+
processedBy?: string;
2122
}
2223

2324
export interface JobJsonRaw {
@@ -39,4 +40,5 @@ export interface JobJsonRaw {
3940
rjk?: string;
4041
atm?: string;
4142
ats?: string;
43+
pb?: string; // Worker name
4244
}

‎src/interfaces/worker-options.ts

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ export type Processor<T = any, R = any, N extends string = string> = (
1414
) => Promise<R>;
1515

1616
export interface WorkerOptions extends QueueBaseOptions {
17+
/**
18+
* Optional worker name. The name will be stored on every job
19+
* processed by this worker instance, and can be used to monitor
20+
* which worker is processing or has processed a given job.
21+
*/
22+
name?: string;
23+
1724
/**
1825
* Condition to start processor at instance creation.
1926
*

‎src/utils.ts

-2
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,4 @@ export const errorToJSON = (value: any): Record<string, any> => {
212212
return error;
213213
};
214214

215-
export const WORKER_SUFFIX = '';
216-
217215
export const QUEUE_EVENT_SUFFIX = ':qe';

‎tests/test_getters.ts

+39
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,45 @@ describe('Jobs getters', function () {
8989
await worker2.close();
9090
});
9191

92+
it('gets all workers including their names', async function () {
93+
const worker = new Worker(queueName, async () => {}, {
94+
autorun: false,
95+
connection,
96+
prefix,
97+
name: 'worker1',
98+
});
99+
await new Promise<void>(resolve => {
100+
worker.on('ready', () => {
101+
resolve();
102+
});
103+
});
104+
105+
const workers = await queue.getWorkers();
106+
expect(workers).to.have.length(1);
107+
108+
const worker2 = new Worker(queueName, async () => {}, {
109+
autorun: false,
110+
connection,
111+
prefix,
112+
name: 'worker2',
113+
});
114+
await new Promise<void>(resolve => {
115+
worker2.on('ready', () => {
116+
resolve();
117+
});
118+
});
119+
120+
const nextWorkers = await queue.getWorkers();
121+
expect(nextWorkers).to.have.length(2);
122+
123+
// Check that the worker names are included in the response on the rawname property
124+
expect(nextWorkers[0].rawname.endsWith('worker1')).to.be.true;
125+
expect(nextWorkers[1].rawname.endsWith('worker2')).to.be.true;
126+
127+
await worker.close();
128+
await worker2.close();
129+
});
130+
92131
it('gets only workers related only to one queue', async function () {
93132
const queueName2 = `${queueName}2`;
94133
const queue2 = new Queue(queueName2, { connection, prefix });

‎tests/test_worker.ts

+29
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,35 @@ describe('workers', function () {
695695
await worker.close();
696696
});
697697

698+
it('sets the worker name on the job upon processing', async () => {
699+
let worker;
700+
const processing = new Promise<void>(async (resolve, reject) => {
701+
worker = new Worker(
702+
queueName,
703+
async job => {
704+
const fetchedJob = await queue.getJob(job.id!);
705+
706+
try {
707+
expect(fetchedJob).to.be.ok;
708+
expect(fetchedJob!.processedBy).to.be.equal(worker.opts.name);
709+
} catch (err) {
710+
reject(err);
711+
}
712+
713+
resolve();
714+
},
715+
{ connection, prefix, name: 'foobar' },
716+
);
717+
await worker.waitUntilReady();
718+
});
719+
720+
await queue.add('test', { foo: 'bar' });
721+
722+
await processing;
723+
724+
await worker.close();
725+
});
726+
698727
it('retry a job that fails', async () => {
699728
let failedOnce = false;
700729
const notEvenErr = new Error('Not even!');

0 commit comments

Comments
 (0)
Please sign in to comment.