Skip to content

Commit

Permalink
[gh-10902 v5] Emit end event in before close
Browse files Browse the repository at this point in the history
  • Loading branch information
iovanom committed Oct 22, 2021
1 parent 271bc60 commit 5468642
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 30 deletions.
27 changes: 12 additions & 15 deletions lib/cursor/AggregationCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ const util = require('util');
*/

function AggregationCursor(agg) {
Readable.call(this, { objectMode: true });
const streamOpts = { objectMode: true };
// for node < 12 we will emit 'close' event after 'end'
if (parseInt(process.versions.node.split('.')[0]) >= 12) {
// set autoDestroy=true because on node 12 it's by default false
// gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12
streamOpts.autoDestroy = true;
}
Readable.call(this, streamOpts);

this.cursor = null;
this.agg = agg;
Expand Down Expand Up @@ -86,20 +93,10 @@ AggregationCursor.prototype._read = function() {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the 'close' is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
// for node >= 12 the autoDestroy will emit the 'close' event
if (parseInt(process.versions.node.split('.')[0]) < 12) {
_this.on('end', () => _this.emit('close'));
}
});
return;
}
Expand Down
27 changes: 12 additions & 15 deletions lib/cursor/QueryCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ const util = require('util');
*/

function QueryCursor(query, options) {
Readable.call(this, { objectMode: true });
const streamOpts = { objectMode: true };
// for node < 12 we will emit 'close' event after 'end'
if (parseInt(process.versions.node.split('.')[0]) >= 12) {
// set autoDestroy=true because on node 12 it's by default false
// gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12
streamOpts.autoDestroy = true;
}
Readable.call(this, streamOpts);

this.cursor = null;
this.query = query;
Expand Down Expand Up @@ -95,20 +102,10 @@ QueryCursor.prototype._read = function() {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the close is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
// for node >= 12 the autoDestroy will emit the 'close' event
if (parseInt(process.versions.node.split('.')[0]) < 12) {
_this.on('end', () => _this.emit('close'));
}
});
return;
}
Expand Down
82 changes: 82 additions & 0 deletions test/query.cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,88 @@ describe('QueryCursor', function() {
});
});

it('query cursor emit end event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.find({}).cursor();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);

setTimeout(() => {
assert.equal(endEventTriggeredCount, 1);
done();
}, 200);
});
});

it('aggregate cursor emit end event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.aggregate([{ $match: {} }]).cursor().exec();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);

setTimeout(() => {
assert.equal(endEventTriggeredCount, 1);
done();
}, 200);
});
});

it('query cursor emit end event before close event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.find({}).cursor();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);
cursor.on('close', () => {
assert.equal(endEventTriggeredCount, 1);
done();
});
});
});

it('aggregate cursor emit end event before close event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.aggregate([{ $match: {} }]).cursor().exec();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);
cursor.on('close', () => {
assert.equal(endEventTriggeredCount, 1);
done();
});
});
});

it('passes document index as the second argument for query cursor (gh-8972)', function() {
return co(function *() {
const User = db.model('User', Schema({ order: Number }));
Expand Down

0 comments on commit 5468642

Please sign in to comment.