Skip to content

Commit 5ad9fa9

Browse files
authoredJan 23, 2019
fix(changeStream): properly handle changeStream event mid-close (#1902)
If a changeStream gets an event while it is in the middle of closing, a race condition can occur where the event is still processed after the stream has closed. This commit adds handling for this edge case by returning an error to callbacks, rejecting promises, and simply ignoring it in emitter mode. Fixes NODE-1831
1 parent e806be4 commit 5ad9fa9

File tree

2 files changed

+119
-0
lines changed

2 files changed

+119
-0
lines changed
 

‎lib/change_stream.js

+14
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,20 @@ function processNewChange(args) {
368368
const change = args.change;
369369
const callback = args.callback;
370370
const eventEmitter = args.eventEmitter || false;
371+
372+
// If the changeStream is closed, then it should not process a change.
373+
if (changeStream.isClosed()) {
374+
// We do not error in the eventEmitter case.
375+
if (eventEmitter) {
376+
return;
377+
}
378+
379+
const error = new MongoError('ChangeStream is closed');
380+
return typeof callback === 'function'
381+
? callback(error, null)
382+
: changeStream.promiseLibrary.reject(error);
383+
}
384+
371385
const topology = changeStream.topology;
372386
const options = changeStream.cursor.options;
373387

‎test/functional/change_stream_tests.js

+105
Original file line numberDiff line numberDiff line change
@@ -1677,4 +1677,109 @@ describe('Change Streams', function() {
16771677
.then(() => finish(), err => finish(err));
16781678
}
16791679
});
1680+
1681+
describe('should properly handle a changeStream event being processed mid-close', function() {
1682+
let client, coll;
1683+
1684+
function write() {
1685+
return Promise.resolve()
1686+
.then(() => coll.insertOne({ a: 1 }))
1687+
.then(() => coll.insertOne({ b: 2 }))
1688+
.then(() => coll.insertOne({ c: 3 }));
1689+
}
1690+
1691+
beforeEach(function() {
1692+
client = this.configuration.newClient();
1693+
return client.connect().then(_client => {
1694+
client = _client;
1695+
coll = client.db(this.configuration.db).collection('tester');
1696+
});
1697+
});
1698+
1699+
afterEach(function() {
1700+
coll = undefined;
1701+
if (client) {
1702+
return client.close().then(() => {
1703+
client = undefined;
1704+
});
1705+
}
1706+
});
1707+
1708+
it('when invoked with promises', {
1709+
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
1710+
test: function() {
1711+
function read() {
1712+
const changeStream = coll.watch();
1713+
return Promise.resolve()
1714+
.then(() => changeStream.next())
1715+
.then(() => changeStream.next())
1716+
.then(() => {
1717+
const nextP = changeStream.next();
1718+
1719+
return changeStream.close().then(() => nextP);
1720+
});
1721+
}
1722+
1723+
return Promise.all([read(), write()]).then(
1724+
() => Promise.reject(new Error('Expected operation to fail with error')),
1725+
err => expect(err.message).to.equal('ChangeStream is closed')
1726+
);
1727+
}
1728+
});
1729+
1730+
it('when invoked with callbacks', {
1731+
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
1732+
test: function(done) {
1733+
const changeStream = coll.watch();
1734+
1735+
changeStream.next(() => {
1736+
changeStream.next(() => {
1737+
changeStream.next(err => {
1738+
let _err = null;
1739+
try {
1740+
expect(err.message).to.equal('ChangeStream is closed');
1741+
} catch (e) {
1742+
_err = e;
1743+
} finally {
1744+
done(_err);
1745+
}
1746+
});
1747+
changeStream.close();
1748+
});
1749+
});
1750+
1751+
write().catch(() => {});
1752+
}
1753+
});
1754+
1755+
it('when invoked using eventEmitter API', {
1756+
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } },
1757+
test: function(done) {
1758+
let closed = false;
1759+
const close = _err => {
1760+
if (closed) {
1761+
return;
1762+
}
1763+
closed = true;
1764+
return done(_err);
1765+
};
1766+
1767+
const changeStream = coll.watch();
1768+
1769+
let counter = 0;
1770+
changeStream.on('change', () => {
1771+
counter += 1;
1772+
if (counter === 2) {
1773+
changeStream.close();
1774+
setTimeout(() => close());
1775+
} else if (counter >= 3) {
1776+
close(new Error('Should not have received more than 2 events'));
1777+
}
1778+
});
1779+
changeStream.on('error', err => close(err));
1780+
1781+
setTimeout(() => write().catch(() => {}));
1782+
}
1783+
});
1784+
});
16801785
});

0 commit comments

Comments
 (0)
Please sign in to comment.