Skip to content

Commit c7559f4

Browse files
authoredFeb 17, 2024··
feat(flow): add ignoreDependencyOnFailure option (#2426)
1 parent 51ef4ae commit c7559f4

12 files changed

+215
-9
lines changed
 

‎docs/gitbook/SUMMARY.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* [Get Flow Tree](guide/flows/get-flow-tree.md)
4141
* [Fail Parent](guide/flows/fail-parent.md)
4242
* [Remove Dependency](guide/flows/remove-dependency.md)
43+
* [Ignore Dependency](guide/flows/ignore-dependency.md)
4344
* [Metrics](guide/metrics/metrics.md)
4445
* [Rate limiting](guide/rate-limiting.md)
4546
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Ignore Dependency
2+
3+
In some situations, you may have a parent job and need to ignore when one of its children fail.
4+
5+
The pattern to solve this requirement consists on using the **ignoreDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is ignored from the parent, so the parent will complete without waiting for the failed children.
6+
7+
```typescript
8+
const flow = new FlowProducer({ connection });
9+
10+
const originalTree = await flow.add({
11+
name: 'root-job',
12+
queueName: 'topQueueName',
13+
data: {},
14+
children: [
15+
{
16+
name,
17+
data: { idx: 0, foo: 'bar' },
18+
queueName: 'childrenQueueName',
19+
opts: { ignoreDependencyOnFailure: true },
20+
children: [
21+
{
22+
name,
23+
data: { idx: 1, foo: 'bah' },
24+
queueName: 'grandChildrenQueueName',
25+
},
26+
{
27+
name,
28+
data: { idx: 2, foo: 'baz' },
29+
queueName: 'grandChildrenQueueName',
30+
},
31+
],
32+
},
33+
{
34+
name,
35+
data: { idx: 3, foo: 'foo' },
36+
queueName: 'childrenQueueName',
37+
},
38+
],
39+
});
40+
```
41+
42+
{% hint style="info" %}
43+
As soon as a **child** with this option fails, the parent job will be moved to a waiting state only if there are no more pending children.
44+
{% endhint %}
45+
46+
Failed children using this option can be retrieved by **getFailedChildrenValues** method:
47+
48+
```typescript
49+
const failedChildrenValues = await originalTree.job.getFailedChildrenValues();
50+
```
51+
52+
## Read more:
53+
54+
- 💡 [Add Flow API Reference](https://api.docs.bullmq.io/classes/v5.FlowProducer.html#add)

‎docs/gitbook/guide/flows/remove-dependency.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Remove Dependency
22

3-
In some situations, you may have a parent job and need to ignore when one of its children fail.
3+
In some situations, you may have a parent job and need to remove the relationship when one of its children fail.
44

55
The pattern to solve this requirement consists on using the **removeDependencyOnFailure** option. This option will make sure that when a job fails, the dependency is removed from the parent, so the parent will complete without waiting for the failed children.
66

‎src/classes/job.ts

+12
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const logger = debuglog('bull');
4040

4141
const optsDecodeMap = {
4242
fpof: 'failParentOnFailure',
43+
idof: 'ignoreDependencyOnFailure',
4344
kl: 'keepLogs',
4445
rdof: 'removeDependencyOnFailure',
4546
};
@@ -815,6 +816,17 @@ export class Job<
815816
}
816817
}
817818

819+
/**
820+
* Get this jobs children failure values if any.
821+
*
822+
* @returns Object mapping children job keys with their failure values.
823+
*/
824+
async getFailedChildrenValues(): Promise<{ [jobKey: string]: string }> {
825+
const client = await this.queue.client;
826+
827+
return client.hgetall(this.toKey(`${this.id}:failed`));
828+
}
829+
818830
/**
819831
* Get children job keys if this job is a parent and has children.
820832
* @remarks

‎src/classes/scripts.ts

+1
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ export class Scripts {
383383
? opts.metrics?.maxDataPoints
384384
: '',
385385
fpof: !!job.opts?.failParentOnFailure,
386+
idof: !!job.opts?.ignoreDependencyOnFailure,
386387
rdof: !!job.opts?.removeDependencyOnFailure,
387388
}),
388389
];

‎src/commands/includes/removeJob.lua

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
]]
44

55
-- Includes
6+
--- @include "removeJobKeys"
67
--- @include "removeParentDependencyKey"
78

89
local function removeJob(jobId, hard, baseKey)
910
local jobKey = baseKey .. jobId
1011
removeParentDependencyKey(jobKey, hard, nil, baseKey)
11-
rcall("DEL", jobKey, jobKey .. ':logs',
12-
jobKey .. ':dependencies', jobKey .. ':processed')
12+
removeJobKeys(jobKey)
1313
end
+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
--[[
2+
Function to remove job keys.
3+
]]
4+
5+
local function removeJobKeys(jobKey)
6+
return rcall("DEL", jobKey, jobKey .. ':logs',
7+
jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed')
8+
end

‎src/commands/includes/removeParentDependencyKey.lua

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
--- @include "addJobInTargetList"
99
--- @include "destructureJobKey"
1010
--- @include "getTargetQueueList"
11+
--- @include "removeJobKeys"
1112

1213
local function moveParentToWait(parentPrefix, parentId, emitEvent)
1314
local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait",
@@ -36,8 +37,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
3637
if hard then -- remove parent in same queue
3738
if parentPrefix == baseKey then
3839
removeParentDependencyKey(parentKey, hard, nil, baseKey)
39-
rcall("DEL", parentKey, parentKey .. ':logs',
40-
parentKey .. ':dependencies', parentKey .. ':processed')
40+
removeJobKeys(parentKey)
4141
else
4242
moveParentToWait(parentPrefix, parentId)
4343
end
@@ -65,8 +65,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
6565
if hard then
6666
if parentPrefix == baseKey then
6767
removeParentDependencyKey(missedParentKey, hard, nil, baseKey)
68-
rcall("DEL", missedParentKey, missedParentKey .. ':logs',
69-
missedParentKey .. ':dependencies', missedParentKey .. ':processed')
68+
removeJobKeys(missedParentKey)
7069
else
7170
moveParentToWait(parentPrefix, parentId)
7271
end

‎src/commands/moveToFinished-14.lua

+6-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
opts - attempts max attempts
4040
opts - maxMetricsSize
4141
opts - fpof - fail parent on fail
42+
opts - idof - ignore dependency on fail
4243
opts - rdof - remove dependency on fail
4344
4445
Output:
@@ -147,11 +148,15 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
147148
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
148149
parentId, jobIdKey,
149150
timestamp)
150-
elseif opts['rdof'] then
151+
elseif opts['idof'] or opts['rdof'] then
151152
local dependenciesSet = parentKey .. ":dependencies"
152153
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
153154
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,
154155
parentKey, parentId, timestamp)
156+
if opts['idof'] then
157+
local failedSet = parentKey .. ":failed"
158+
rcall("HSET", failedSet, jobIdKey, ARGV[4])
159+
end
155160
end
156161
end
157162
end

‎src/commands/removeJob-1.lua

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ local rcall = redis.call
1919
--- @include "includes/getOrSetMaxEvents"
2020
--- @include "includes/isLocked"
2121
--- @include "includes/removeJobFromAnyState"
22+
--- @include "includes/removeJobKeys"
2223
--- @include "includes/removeParentDependencyKey"
2324

2425
local function removeJob( prefix, jobId, parentKey, removeChildren)
@@ -50,11 +51,21 @@ local function removeJob( prefix, jobId, parentKey, removeChildren)
5051
removeJob( childJobPrefix, childJobId, jobKey, removeChildren )
5152
end
5253
end
54+
55+
local failed = rcall("HGETALL", jobKey .. ":failed")
56+
57+
if (#failed > 0) then
58+
for i = 1, #failed, 2 do
59+
local childJobId = getJobIdFromKey(failed[i])
60+
local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
61+
removeJob( childJobPrefix, childJobId, jobKey, removeChildren )
62+
end
63+
end
5364
end
5465

5566
local prev = removeJobFromAnyState(prefix, jobId)
5667

57-
if rcall("DEL", jobKey, jobKey .. ":logs", jobKey .. ":dependencies", jobKey .. ":processed") > 0 then
68+
if removeJobKeys(jobKey) > 0 then
5869
local maxEvents = getOrSetMaxEvents(prefix .. "meta")
5970
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
6071
"jobId", jobId, "prev", prev)

‎src/types/job-options.ts

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ export type JobsOptions = BaseJobOptions & {
66
*/
77
failParentOnFailure?: boolean;
88

9+
/**
10+
* If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts.
11+
*/
12+
ignoreDependencyOnFailure?: boolean;
13+
914
/**
1015
* If true, removes the job from its parent dependencies when it fails after all attempts.
1116
*/
@@ -21,6 +26,11 @@ export type RedisJobOptions = BaseJobOptions & {
2126
*/
2227
fpof?: boolean;
2328

29+
/**
30+
* If true, moves the jobId from its parent dependencies to failed dependencies when it fails after all attempts.
31+
*/
32+
idof?: boolean;
33+
2434
/**
2535
* Maximum amount of log entries that will be preserved
2636
*/

‎tests/test_flow.ts

+105
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,111 @@ describe('flows', () => {
460460
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
461461
});
462462

463+
describe('when ignoreDependencyOnFailure is provided', async () => {
464+
it('moves parent to wait after children fail', async () => {
465+
const parentQueueName = `parent-queue-${v4()}`;
466+
const parentQueue = new Queue(parentQueueName, { connection, prefix });
467+
const name = 'child-job';
468+
469+
const parentProcessor = async (job: Job) => {
470+
const values = await job.getDependencies({
471+
processed: {},
472+
});
473+
expect(values).to.deep.equal({
474+
processed: {},
475+
nextProcessedCursor: 0,
476+
});
477+
};
478+
479+
const parentWorker = new Worker(parentQueueName, parentProcessor, {
480+
connection,
481+
prefix,
482+
});
483+
const childrenWorker = new Worker(
484+
queueName,
485+
async () => {
486+
await delay(10);
487+
throw new Error('error');
488+
},
489+
{
490+
connection,
491+
prefix,
492+
},
493+
);
494+
await parentWorker.waitUntilReady();
495+
await childrenWorker.waitUntilReady();
496+
497+
const completed = new Promise<void>(resolve => {
498+
parentWorker.on('completed', async (job: Job) => {
499+
expect(job.finishedOn).to.be.string;
500+
const counts = await parentQueue.getJobCounts('completed');
501+
expect(counts.completed).to.be.equal(1);
502+
resolve();
503+
});
504+
});
505+
506+
const flow = new FlowProducer({ connection, prefix });
507+
const tree = await flow.add({
508+
name: 'parent-job',
509+
queueName: parentQueueName,
510+
data: {},
511+
children: [
512+
{
513+
name,
514+
data: { idx: 0, foo: 'bar' },
515+
queueName,
516+
opts: { ignoreDependencyOnFailure: true },
517+
},
518+
{
519+
name,
520+
data: { idx: 1, foo: 'baz' },
521+
queueName,
522+
opts: { ignoreDependencyOnFailure: true },
523+
},
524+
{
525+
name,
526+
data: { idx: 2, foo: 'qux' },
527+
queueName,
528+
opts: { ignoreDependencyOnFailure: true },
529+
},
530+
],
531+
});
532+
533+
expect(tree).to.have.property('job');
534+
expect(tree).to.have.property('children');
535+
536+
const { children, job } = tree;
537+
const parentState = await job.getState();
538+
539+
expect(parentState).to.be.eql('waiting-children');
540+
expect(children).to.have.length(3);
541+
542+
expect(children[0].job.id).to.be.ok;
543+
expect(children[0].job.data.foo).to.be.eql('bar');
544+
expect(children[1].job.id).to.be.ok;
545+
expect(children[1].job.data.foo).to.be.eql('baz');
546+
expect(children[2].job.id).to.be.ok;
547+
expect(children[2].job.data.foo).to.be.eql('qux');
548+
549+
await completed;
550+
551+
const failedChildrenValues = await job.getFailedChildrenValues();
552+
553+
expect(failedChildrenValues).to.deep.equal({
554+
[`${queue.qualifiedName}:${children[0].job.id}`]: 'error',
555+
[`${queue.qualifiedName}:${children[1].job.id}`]: 'error',
556+
[`${queue.qualifiedName}:${children[2].job.id}`]: 'error',
557+
});
558+
559+
await childrenWorker.close();
560+
await parentWorker.close();
561+
await flow.close();
562+
await parentQueue.close();
563+
564+
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
565+
}).timeout(8000);
566+
});
567+
463568
describe('when removeDependencyOnFailure is provided', async () => {
464569
it('moves parent to wait after children fail', async () => {
465570
const parentQueueName = `parent-queue-${v4()}`;

0 commit comments

Comments
 (0)
Please sign in to comment.