Skip to content

Commit

Permalink
[gh-10875] Use stream destroy method on close to prevent emit 'close'…
Browse files Browse the repository at this point in the history
… event twice
  • Loading branch information
iovanom committed Oct 18, 2021
1 parent 4b8e0d1 commit 46165d6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
9 changes: 8 additions & 1 deletion lib/cursor/AggregationCursor.js
Expand Up @@ -90,7 +90,14 @@ AggregationCursor.prototype._read = function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
_this.emit('close');
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the 'close' is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
});
Expand Down
9 changes: 8 additions & 1 deletion lib/cursor/QueryCursor.js
Expand Up @@ -99,7 +99,14 @@ QueryCursor.prototype._read = function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
_this.emit('close');
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the close is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
});
Expand Down
42 changes: 42 additions & 0 deletions test/query.cursor.test.js
Expand Up @@ -644,6 +644,48 @@ describe('QueryCursor', function() {
}, 20);
});

it('closing query cursor emits `close` event only once with stream pause/resume (gh-10876)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.find().cursor();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let closeEventTriggeredCount = 0;
cursor.on('close', () => closeEventTriggeredCount++);
setTimeout(() => {
assert.equal(closeEventTriggeredCount, 1);
done();
}, 200);
});
});

it('closing aggregation cursor emits `close` event only once with stream pause/resume (gh-10876)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.aggregate([{ $match: {} }]).cursor().exec();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let closeEventTriggeredCount = 0;
cursor.on('close', () => closeEventTriggeredCount++);


setTimeout(() => {
assert.equal(closeEventTriggeredCount, 1);
done();
}, 200);
});
});

it('passes document index as the second argument for query cursor (gh-8972)', function() {
return co(function *() {
const User = db.model('User', Schema({ order: Number }));
Expand Down

0 comments on commit 46165d6

Please sign in to comment.