Skip to content

Commit

Permalink
fix(NODE-3515): do proper opTime merging in bulk results (#3011)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Oct 20, 2021
1 parent 564b0d7 commit 428e6d3
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 32 deletions.
67 changes: 36 additions & 31 deletions lib/bulk/common.js
Expand Up @@ -414,6 +414,15 @@ class WriteError {
}
}

/**
* Converts the number to a Long or returns it.
*
* @ignore
*/
function longOrConvert(value) {
return typeof value === 'number' ? Long.fromNumber(value) : value;
}

/**
* Merges results into shared data structure
* @ignore
Expand Down Expand Up @@ -445,42 +454,37 @@ function mergeBatchResults(batch, bulkResult, err, result) {
return;
}

// Deal with opTime if available
// The server write command specification states that lastOp is an optional
// mongod only field that has a type of timestamp. Across various scarce specs
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
// "t" field with Timestamp and Long as their types respectively.
// The "lastOp" field of the bulk write result is never mentioned in the driver
// specifications or the bulk write spec, so we should probably just keep its
// value consistent since it seems to vary.
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
if (result.opTime || result.lastOp) {
const opTime = result.lastOp || result.opTime;
let lastOpTS = null;
let lastOpT = null;
let opTime = result.lastOp || result.opTime;

// We have a time stamp
if (opTime && opTime._bsontype === 'Timestamp') {
if (bulkResult.lastOp == null) {
bulkResult.lastOp = opTime;
} else if (opTime.greaterThan(bulkResult.lastOp)) {
bulkResult.lastOp = opTime;
}
} else {
// Existing TS
if (bulkResult.lastOp) {
lastOpTS =
typeof bulkResult.lastOp.ts === 'number'
? Long.fromNumber(bulkResult.lastOp.ts)
: bulkResult.lastOp.ts;
lastOpT =
typeof bulkResult.lastOp.t === 'number'
? Long.fromNumber(bulkResult.lastOp.t)
: bulkResult.lastOp.t;
}

// Current OpTime TS
const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
// If the opTime is a Timestamp, convert it to a consistent format to be
// able to compare easily. Converting to the object from a timestamp is
// much more straightforward than the other direction.
if (opTime._bsontype === 'Timestamp') {
opTime = { ts: opTime, t: Long.ZERO };
}

// Compare the opTime's
if (bulkResult.lastOp == null) {
bulkResult.lastOp = opTime;
} else if (opTimeTS.greaterThan(lastOpTS)) {
// If there's no lastOp, just set it.
if (!bulkResult.lastOp) {
bulkResult.lastOp = opTime;
} else {
// First compare the ts values and set if the opTimeTS value is greater.
const lastOpTS = longOrConvert(bulkResult.lastOp.ts);
const opTimeTS = longOrConvert(opTime.ts);
if (opTimeTS.greaterThan(lastOpTS)) {
bulkResult.lastOp = opTime;
} else if (opTimeTS.equals(lastOpTS)) {
// If the ts values are equal, then compare using the t values.
const lastOpT = longOrConvert(bulkResult.lastOp.t);
const opTimeT = longOrConvert(opTime.t);
if (opTimeT.greaterThan(lastOpT)) {
bulkResult.lastOp = opTime;
}
Expand Down Expand Up @@ -1387,6 +1391,7 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
module.exports = {
Batch,
BulkOperationBase,
mergeBatchResults,
bson,
INSERT: INSERT,
UPDATE: UPDATE,
Expand Down
128 changes: 127 additions & 1 deletion test/unit/bulk_write.test.js
Expand Up @@ -2,7 +2,11 @@

const expect = require('chai').expect;
const mock = require('mongodb-mock-server');
const BulkWriteResult = require('../../lib/bulk/common').BulkWriteResult;
const Long = require('../../lib/core').BSON.Long;
const Timestamp = require('../../lib/core').BSON.Timestamp;
const common = require('../../lib/bulk/common');
const BulkWriteResult = common.BulkWriteResult;
const mergeBatchResults = common.mergeBatchResults;

describe('Bulk Writes', function() {
const test = {};
Expand Down Expand Up @@ -131,4 +135,126 @@ describe('Bulk Writes', function() {

expect(() => result.insertedIds).to.not.throw();
});

describe('#mergeBatchResults', function() {
let opTime;
let lastOp;
const bulkResult = {
ok: 1,
writeErrors: [],
writeConcernErrors: [],
insertedIds: [],
nInserted: 0,
nUpserted: 0,
nMatched: 0,
nModified: 0,
nRemoved: 1,
upserted: []
};
const result = {
n: 8,
nModified: 8,
electionId: '7fffffff0000000000000028',
ok: 1,
$clusterTime: {
clusterTime: '7020546605669417498',
signature: {
hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=',
keyId: 0
}
},
operationTime: '7020546605669417498'
};
const batch = [];

context('when lastOp is an object', function() {
context('when the opTime is a Timestamp', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = Timestamp.fromNumber(8020546605669417496);
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the properly formatted object', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal({ ts: opTime, t: Long.ZERO });
});
});

context('when the opTime is an object', function() {
context('when the ts is greater', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417497, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(opTime);
});
});

context('when the ts is equal', function() {
context('when the t is greater', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 20 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('replaces the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(opTime);
});
});

context('when the t is equal', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});

context('when the t is less', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417496, t: 5 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});
});

context('when the ts is less', function() {
before(function() {
lastOp = { ts: 7020546605669417496, t: 10 };
opTime = { ts: 7020546605669417495, t: 10 };
bulkResult.lastOp = lastOp;
result.opTime = opTime;
});

it('does not replace the lastOp with the new opTime', function() {
mergeBatchResults(batch, bulkResult, null, result);
expect(bulkResult.lastOp).to.deep.equal(lastOp);
});
});
});
});
});
});

0 comments on commit 428e6d3

Please sign in to comment.