Skip to content

Commit 51ef4ae

Browse files
authoredFeb 16, 2024··
test(flow): do not remove grandparent on regular remove (#2429)
1 parent 5938664 commit 51ef4ae

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed
 

‎src/commands/includes/removeParentDependencyKey.lua

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
3333
local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
3434

3535
if numRemovedElements == 1 then
36-
if hard then
36+
if hard then -- remove parent in same queue
3737
if parentPrefix == baseKey then
3838
removeParentDependencyKey(parentKey, hard, nil, baseKey)
3939
rcall("DEL", parentKey, parentKey .. ':logs',

‎tests/test_flow.ts

+117
Original file line numberDiff line numberDiff line change
@@ -3235,6 +3235,123 @@ describe('flows', () => {
32353235
await parentQueue.close();
32363236
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
32373237
});
3238+
3239+
describe('when there is a grand parent', () => {
3240+
it('removes all children when removing a parent, but not grandparent', async () => {
3241+
const parentQueueName = `parent-queue-${v4()}`;
3242+
const grandparentQueueName = `grandparent-queue-${v4()}`;
3243+
const name = 'child-job';
3244+
3245+
const flow = new FlowProducer({ connection, prefix });
3246+
const tree = await flow.add({
3247+
name: 'grandparent-job',
3248+
queueName: grandparentQueueName,
3249+
data: {},
3250+
children: [
3251+
{
3252+
name: 'parent-job',
3253+
queueName: parentQueueName,
3254+
data: {},
3255+
children: [
3256+
{ name, data: { idx: 0, foo: 'bar' }, queueName },
3257+
{
3258+
name,
3259+
data: { idx: 0, foo: 'baz' },
3260+
queueName,
3261+
children: [
3262+
{ name, data: { idx: 0, foo: 'qux' }, queueName },
3263+
],
3264+
},
3265+
],
3266+
},
3267+
],
3268+
});
3269+
3270+
expect(await tree.job.getState()).to.be.equal('waiting-children');
3271+
expect(await tree.children![0].job.getState()).to.be.equal(
3272+
'waiting-children',
3273+
);
3274+
3275+
expect(
3276+
await tree.children![0].children![0].job.getState(),
3277+
).to.be.equal('waiting');
3278+
expect(
3279+
await tree.children![0].children![1].job.getState(),
3280+
).to.be.equal('waiting-children');
3281+
3282+
expect(
3283+
await tree.children![0].children![1].children![0].job.getState(),
3284+
).to.be.equal('waiting');
3285+
3286+
for (let i = 0; i < tree.children![0].children!.length; i++) {
3287+
const child = tree.children![0].children![i];
3288+
const childJob = await Job.fromId(queue, child.job.id);
3289+
expect(childJob.parent).to.deep.equal({
3290+
id: tree.children![0].job.id,
3291+
queueKey: `${prefix}:${parentQueueName}`,
3292+
});
3293+
}
3294+
3295+
const parentWorker = new Worker(parentQueueName, async () => {}, {
3296+
connection,
3297+
prefix,
3298+
});
3299+
const childrenWorker = new Worker(
3300+
queueName,
3301+
async () => {
3302+
await delay(10);
3303+
},
3304+
{
3305+
connection,
3306+
prefix,
3307+
},
3308+
);
3309+
await parentWorker.waitUntilReady();
3310+
await childrenWorker.waitUntilReady();
3311+
3312+
const completing = new Promise(resolve => {
3313+
parentWorker.on('completed', resolve);
3314+
});
3315+
3316+
await completing;
3317+
await tree.children![0].job.remove();
3318+
3319+
const parentQueue = new Queue(parentQueueName, {
3320+
connection,
3321+
prefix,
3322+
});
3323+
const parentJob = await Job.fromId(parentQueue, tree.job.id);
3324+
expect(parentJob).to.be.undefined;
3325+
3326+
for (let i = 0; i < tree.children![0].children!.length; i++) {
3327+
const child = tree.children![0].children![i];
3328+
const childJob = await Job.fromId(queue, child.job.id);
3329+
expect(childJob).to.be.undefined;
3330+
}
3331+
3332+
const jobs = await queue.getJobCountByTypes('completed');
3333+
expect(jobs).to.be.equal(0);
3334+
3335+
expect(
3336+
await tree.children![0].children![0].job.getState(),
3337+
).to.be.equal('unknown');
3338+
expect(
3339+
await tree.children![0].children![1].job.getState(),
3340+
).to.be.equal('unknown');
3341+
expect(await tree.children![0].job.getState()).to.be.equal('unknown');
3342+
expect(await tree.job.getState()).to.be.equal('waiting');
3343+
3344+
await flow.close();
3345+
await childrenWorker.close();
3346+
await parentWorker.close();
3347+
await parentQueue.close();
3348+
await removeAllQueueData(
3349+
new IORedis(redisHost),
3350+
grandparentQueueName,
3351+
);
3352+
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
3353+
});
3354+
});
32383355
});
32393356

32403357
it('should not remove anything if there is a locked job in the tree', async () => {

0 commit comments

Comments
 (0)
Please sign in to comment.