Skip to content

Commit

Permalink
Merge pull request #10916 from iovanom/gh-10902-v5
Browse files Browse the repository at this point in the history
[gh-10902 v5] Emit end event in before close
  • Loading branch information
vkarpov15 committed Nov 2, 2021
2 parents 271bc60 + ff5ddb5 commit c3463c4
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 31 deletions.
28 changes: 13 additions & 15 deletions lib/cursor/AggregationCursor.js
Expand Up @@ -10,6 +10,7 @@ const promiseOrCallback = require('../helpers/promiseOrCallback');
const eachAsync = require('../helpers/cursor/eachAsync');
const immediate = require('../helpers/immediate');
const util = require('util');
const utils = require('../../lib/utils');

/**
* An AggregationCursor is a concurrency primitive for processing aggregation
Expand All @@ -36,7 +37,14 @@ const util = require('util');
*/

function AggregationCursor(agg) {
Readable.call(this, { objectMode: true });
const streamOpts = { objectMode: true };
// for node < 12 we will emit 'close' event after 'end'
if (utils.nodeMajorVersion >= 12) {
// set autoDestroy=true because on node 12 it's by default false
// gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12
streamOpts.autoDestroy = true;
}
Readable.call(this, streamOpts);

this.cursor = null;
this.agg = agg;
Expand Down Expand Up @@ -86,20 +94,10 @@ AggregationCursor.prototype._read = function() {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the 'close' is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
// for node >= 12 the autoDestroy will emit the 'close' event
if (utils.nodeMajorVersion < 12) {
_this.on('end', () => _this.emit('close'));
}
});
return;
}
Expand Down
28 changes: 13 additions & 15 deletions lib/cursor/QueryCursor.js
Expand Up @@ -10,6 +10,7 @@ const eachAsync = require('../helpers/cursor/eachAsync');
const helpers = require('../queryhelpers');
const immediate = require('../helpers/immediate');
const util = require('util');
const utils = require('../../lib/utils');

/**
* A QueryCursor is a concurrency primitive for processing query results
Expand All @@ -34,7 +35,14 @@ const util = require('util');
*/

function QueryCursor(query, options) {
Readable.call(this, { objectMode: true });
const streamOpts = { objectMode: true };
// for node < 12 we will emit 'close' event after 'end'
if (utils.nodeMajorVersion >= 12) {
// set autoDestroy=true because on node 12 it's by default false
// gh-10902 need autoDestroy to destroy correctly and emit 'close' event for node >= 12
streamOpts.autoDestroy = true;
}
Readable.call(this, streamOpts);

this.cursor = null;
this.query = query;
Expand Down Expand Up @@ -95,20 +103,10 @@ QueryCursor.prototype._read = function() {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
// on node >= 14 streams close automatically (gh-8834)
const isNotClosedAutomatically = !_this.destroyed;
if (isNotClosedAutomatically) {
// call destroy method if exists to prevent emit twice 'close' by autoDestroy (gh-10876)
// @see https://nodejs.org/api/stream.html#stream_readable_destroy_error
// the close is emited on destroy started with version 10
if (_this.destroy && parseInt(process.versions.node.split('.')[0]) > 9) {
_this.destroy();
} else {
_this.emit('close');
}
}
}, 0);
// for node >= 12 the autoDestroy will emit the 'close' event
if (utils.nodeMajorVersion < 12) {
_this.on('end', () => _this.emit('close'));
}
});
return;
}
Expand Down
4 changes: 3 additions & 1 deletion lib/utils.js
Expand Up @@ -934,4 +934,6 @@ exports.errorToPOJO = function errorToPOJO(error) {
ret[properyName] = error[properyName];
}
return ret;
};
};

exports.nodeMajorVersion = parseInt(process.versions.node.split('.')[0], 10);
82 changes: 82 additions & 0 deletions test/query.cursor.test.js
Expand Up @@ -686,6 +686,88 @@ describe('QueryCursor', function() {
});
});

it('query cursor emit end event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.find({}).cursor();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);

setTimeout(() => {
assert.equal(endEventTriggeredCount, 1);
done();
}, 200);
});
});

it('aggregate cursor emit end event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.aggregate([{ $match: {} }]).cursor().exec();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);

setTimeout(() => {
assert.equal(endEventTriggeredCount, 1);
done();
}, 200);
});
});

it('query cursor emit end event before close event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.find({}).cursor();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);
cursor.on('close', () => {
assert.equal(endEventTriggeredCount, 1);
done();
});
});
});

it('aggregate cursor emit end event before close event (gh-10902)', function(done) {
const User = db.model('User', new Schema({ name: String }));

User.create({ name: 'First' }, { name: 'Second' })
.then(() => {
const cursor = User.aggregate([{ $match: {} }]).cursor().exec();
cursor.on('data', () => {
cursor.pause();
setTimeout(() => cursor.resume(), 50);
});

let endEventTriggeredCount = 0;
cursor.on('end', () => endEventTriggeredCount++);
cursor.on('close', () => {
assert.equal(endEventTriggeredCount, 1);
done();
});
});
});

it('passes document index as the second argument for query cursor (gh-8972)', function() {
return co(function *() {
const User = db.model('User', Schema({ order: Number }));
Expand Down

0 comments on commit c3463c4

Please sign in to comment.