Skip to content

Commit 666b8fa

Browse files
author
Sophie Saskin
authoredJul 27, 2018
refactor(bulk): Unify bulk operations
Factors out common code in ordered.js and unordered.js into common.js. Fixes NODE-1486
1 parent a0d84f6 commit 666b8fa

File tree

3 files changed

+1007
-1252
lines changed

3 files changed

+1007
-1252
lines changed
 

‎lib/bulk/common.js

+816-168
Large diffs are not rendered by default.

‎lib/bulk/ordered.js

+87-539
Original file line numberDiff line numberDiff line change
@@ -1,442 +1,143 @@
11
'use strict';
22

33
const common = require('./common');
4+
const BulkOperationBase = common.BulkOperationBase;
45
const utils = require('../utils');
5-
const toError = require('../utils').toError;
6-
const handleCallback = require('../utils').handleCallback;
7-
const shallowClone = utils.shallowClone;
6+
const toError = utils.toError;
7+
const handleCallback = utils.handleCallback;
88
const BulkWriteResult = common.BulkWriteResult;
9-
const ObjectID = require('mongodb-core').BSON.ObjectID;
10-
const BSON = require('mongodb-core').BSON;
119
const Batch = common.Batch;
1210
const mergeBatchResults = common.mergeBatchResults;
1311
const executeOperation = utils.executeOperation;
14-
const BulkWriteError = require('./common').BulkWriteError;
15-
const applyWriteConcern = utils.applyWriteConcern;
1612
const MongoWriteConcernError = require('mongodb-core').MongoWriteConcernError;
1713
const handleMongoWriteConcernError = require('./common').handleMongoWriteConcernError;
18-
19-
var bson = new BSON([
20-
BSON.Binary,
21-
BSON.Code,
22-
BSON.DBRef,
23-
BSON.Decimal128,
24-
BSON.Double,
25-
BSON.Int32,
26-
BSON.Long,
27-
BSON.Map,
28-
BSON.MaxKey,
29-
BSON.MinKey,
30-
BSON.ObjectId,
31-
BSON.BSONRegExp,
32-
BSON.Symbol,
33-
BSON.Timestamp
34-
]);
35-
36-
/**
37-
* Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
38-
* @class
39-
* @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
40-
*/
41-
var FindOperatorsOrdered = function(self) {
42-
this.s = self.s;
43-
};
44-
45-
/**
46-
* Add a single update document to the bulk operation
47-
*
48-
* @method
49-
* @param {object} doc update operations
50-
* @throws {MongoError}
51-
* @return {OrderedBulkOperation}
52-
*/
53-
FindOperatorsOrdered.prototype.update = function(updateDocument) {
54-
// Perform upsert
55-
var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
56-
57-
// Establish the update command
58-
var document = {
59-
q: this.s.currentOp.selector,
60-
u: updateDocument,
61-
multi: true,
62-
upsert: upsert
63-
};
64-
65-
// Clear out current Op
66-
this.s.currentOp = null;
67-
// Add the update document to the list
68-
return addToOperationsList(this, common.UPDATE, document);
69-
};
70-
71-
/**
72-
* Add a single update one document to the bulk operation
73-
*
74-
* @method
75-
* @param {object} doc update operations
76-
* @throws {MongoError}
77-
* @return {OrderedBulkOperation}
78-
*/
79-
FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
80-
// Perform upsert
81-
var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
82-
83-
// Establish the update command
84-
var document = {
85-
q: this.s.currentOp.selector,
86-
u: updateDocument,
87-
multi: false,
88-
upsert: upsert
89-
};
90-
91-
// Clear out current Op
92-
this.s.currentOp = null;
93-
// Add the update document to the list
94-
return addToOperationsList(this, common.UPDATE, document);
95-
};
14+
const bson = common.bson;
9615

9716
/**
98-
* Add a replace one operation to the bulk operation
17+
* Add to internal list of Operations
9918
*
100-
* @method
101-
* @param {object} doc the new document to replace the existing one with
102-
* @throws {MongoError}
19+
* @param {OrderedBulkOperation} bulkOperation
20+
* @param {number} docType number indicating the document type
21+
* @param {object} document
10322
* @return {OrderedBulkOperation}
10423
*/
105-
FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
106-
this.updateOne(updateDocument);
107-
};
108-
109-
/**
110-
* Upsert modifier for update bulk operation
111-
*
112-
* @method
113-
* @throws {MongoError}
114-
* @return {FindOperatorsOrdered}
115-
*/
116-
FindOperatorsOrdered.prototype.upsert = function() {
117-
this.s.currentOp.upsert = true;
118-
return this;
119-
};
120-
121-
/**
122-
* Add a remove one operation to the bulk operation
123-
*
124-
* @method
125-
* @throws {MongoError}
126-
* @return {OrderedBulkOperation}
127-
*/
128-
FindOperatorsOrdered.prototype.deleteOne = function() {
129-
// Establish the update command
130-
var document = {
131-
q: this.s.currentOp.selector,
132-
limit: 1
133-
};
134-
135-
// Clear out current Op
136-
this.s.currentOp = null;
137-
// Add the remove document to the list
138-
return addToOperationsList(this, common.REMOVE, document);
139-
};
140-
141-
// Backward compatibility
142-
FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
143-
144-
/**
145-
* Add a remove operation to the bulk operation
146-
*
147-
* @method
148-
* @throws {MongoError}
149-
* @return {OrderedBulkOperation}
150-
*/
151-
FindOperatorsOrdered.prototype.delete = function() {
152-
// Establish the update command
153-
var document = {
154-
q: this.s.currentOp.selector,
155-
limit: 0
156-
};
157-
158-
// Clear out current Op
159-
this.s.currentOp = null;
160-
// Add the remove document to the list
161-
return addToOperationsList(this, common.REMOVE, document);
162-
};
163-
164-
// Backward compatibility
165-
FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
166-
167-
// Add to internal list of documents
168-
var addToOperationsList = function(_self, docType, document) {
24+
function addToOperationsList(bulkOperation, docType, document) {
16925
// Get the bsonSize
170-
var bsonSize = bson.calculateObjectSize(document, {
26+
const bsonSize = bson.calculateObjectSize(document, {
17127
checkKeys: false
17228
});
17329

17430
// Throw error if the doc is bigger than the max BSON size
175-
if (bsonSize >= _self.s.maxBatchSizeBytes) {
176-
throw toError('document is larger than the maximum size ' + _self.s.maxBatchSizeBytes);
177-
}
31+
if (bsonSize >= bulkOperation.s.maxBatchSizeBytes)
32+
throw toError('document is larger than the maximum size ' + bulkOperation.s.maxBatchSizeBytes);
17833

17934
// Create a new batch object if we don't have a current one
180-
if (_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
35+
if (bulkOperation.s.currentBatch == null)
36+
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
18137

18238
// Check if we need to create a new batch
18339
if (
184-
_self.s.currentBatchSize + 1 >= _self.s.maxWriteBatchSize ||
185-
_self.s.currentBatchSizeBytes + _self.s.currentBatchSizeBytes >= _self.s.maxBatchSizeBytes ||
186-
_self.s.currentBatch.batchType !== docType
40+
bulkOperation.s.currentBatchSize + 1 >= bulkOperation.s.maxWriteBatchSize ||
41+
bulkOperation.s.currentBatchSizeBytes + bulkOperation.s.currentBatchSizeBytes >=
42+
bulkOperation.s.maxBatchSizeBytes ||
43+
bulkOperation.s.currentBatch.batchType !== docType
18744
) {
18845
// Save the batch to the execution stack
189-
_self.s.batches.push(_self.s.currentBatch);
46+
bulkOperation.s.batches.push(bulkOperation.s.currentBatch);
19047

19148
// Create a new batch
192-
_self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
49+
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
19350

19451
// Reset the current size trackers
195-
_self.s.currentBatchSize = 0;
196-
_self.s.currentBatchSizeBytes = 0;
52+
bulkOperation.s.currentBatchSize = 0;
53+
bulkOperation.s.currentBatchSizeBytes = 0;
19754
} else {
19855
// Update current batch size
199-
_self.s.currentBatchSize = _self.s.currentBatchSize + 1;
200-
_self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
56+
bulkOperation.s.currentBatchSize = bulkOperation.s.currentBatchSize + 1;
57+
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
20158
}
20259

20360
if (docType === common.INSERT) {
204-
_self.s.bulkResult.insertedIds.push({ index: _self.s.currentIndex, _id: document._id });
61+
bulkOperation.s.bulkResult.insertedIds.push({
62+
index: bulkOperation.s.currentIndex,
63+
_id: document._id
64+
});
20565
}
20666

20767
// We have an array of documents
20868
if (Array.isArray(document)) {
20969
throw toError('operation passed in cannot be an Array');
21070
} else {
211-
_self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
212-
_self.s.currentBatch.operations.push(document);
213-
_self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
214-
_self.s.currentIndex = _self.s.currentIndex + 1;
71+
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
72+
bulkOperation.s.currentBatch.operations.push(document);
73+
bulkOperation.s.currentBatchSizeBytes = bulkOperation.s.currentBatchSizeBytes + bsonSize;
74+
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
21575
}
21676

217-
// Return self
218-
return _self;
219-
};
77+
// Return bulkOperation
78+
return bulkOperation;
79+
}
22080

22181
/**
22282
* Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
22383
* @class
22484
* @property {number} length Get the number of operations in the bulk.
22585
* @return {OrderedBulkOperation} a OrderedBulkOperation instance.
22686
*/
227-
function OrderedBulkOperation(topology, collection, options) {
228-
options = options == null ? {} : options;
229-
// TODO Bring from driver information in isMaster
230-
var executed = false;
231-
232-
// Current item
233-
var currentOp = null;
234-
235-
// Handle to the bson serializer, used to calculate running sizes
236-
var bson = topology.bson;
237-
238-
// Namespace for the operation
239-
var namespace = collection.collectionName;
240-
241-
// Set max byte size
242-
var maxBatchSizeBytes =
243-
topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
244-
? topology.isMasterDoc.maxBsonObjectSize
245-
: 1024 * 1025 * 16;
246-
var maxWriteBatchSize =
247-
topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
248-
? topology.isMasterDoc.maxWriteBatchSize
249-
: 1000;
250-
251-
// Get the write concern
252-
var writeConcern = applyWriteConcern(shallowClone(options), { collection: collection }, options);
253-
writeConcern = writeConcern.writeConcern;
25487

255-
// Get the promiseLibrary
256-
var promiseLibrary = options.promiseLibrary || Promise;
257-
258-
// Final results
259-
var bulkResult = {
260-
ok: 1,
261-
writeErrors: [],
262-
writeConcernErrors: [],
263-
insertedIds: [],
264-
nInserted: 0,
265-
nUpserted: 0,
266-
nMatched: 0,
267-
nModified: 0,
268-
nRemoved: 0,
269-
upserted: []
270-
};
271-
272-
// Internal state
273-
this.s = {
274-
// Final result
275-
bulkResult: bulkResult,
276-
// Current batch state
277-
currentBatch: null,
278-
currentIndex: 0,
279-
currentBatchSize: 0,
280-
currentBatchSizeBytes: 0,
281-
batches: [],
282-
// Write concern
283-
writeConcern: writeConcern,
284-
// Max batch size options
285-
maxBatchSizeBytes: maxBatchSizeBytes,
286-
maxWriteBatchSize: maxWriteBatchSize,
287-
// Namespace
288-
namespace: namespace,
289-
// BSON
290-
bson: bson,
291-
// Topology
292-
topology: topology,
293-
// Options
294-
options: options,
295-
// Current operation
296-
currentOp: currentOp,
297-
// Executed
298-
executed: executed,
299-
// Collection
300-
collection: collection,
301-
// Promise Library
302-
promiseLibrary: promiseLibrary,
303-
// Fundamental error
304-
err: null,
305-
// check keys
306-
checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : true
307-
};
308-
// bypass Validation
309-
if (options.bypassDocumentValidation === true) {
310-
this.s.bypassDocumentValidation = true;
88+
class OrderedBulkOperation extends BulkOperationBase {
89+
constructor(topology, collection, options) {
90+
options = options || {};
91+
options = Object.assign(options, { addToOperationsList });
92+
93+
super(topology, collection, options, true);
94+
}
95+
96+
/**
97+
* The callback format for results
98+
* @callback OrderedBulkOperation~resultCallback
99+
* @param {MongoError} error An error instance representing the error during the execution.
100+
* @param {BulkWriteResult} result The bulk write result.
101+
*/
102+
103+
/**
104+
* Execute the ordered bulk operation
105+
*
106+
* @method
107+
* @param {object} [options] Optional settings.
108+
* @param {(number|string)} [options.w] The write concern.
109+
* @param {number} [options.wtimeout] The write concern timeout.
110+
* @param {boolean} [options.j=false] Specify a journal write concern.
111+
* @param {boolean} [options.fsync=false] Specify a file sync write concern.
112+
* @param {OrderedBulkOperation~resultCallback} [callback] The result callback
113+
* @throws {MongoError}
114+
* @return {Promise} returns Promise if no callback passed
115+
*/
116+
execute(_writeConcern, options, callback) {
117+
const ret = this.bulkExecute(_writeConcern, options, callback);
118+
options = ret.options;
119+
callback = ret.callback;
120+
121+
return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
311122
}
312123
}
313124

314-
OrderedBulkOperation.prototype.raw = function(op) {
315-
var key = Object.keys(op)[0];
316-
317-
// Set up the force server object id
318-
var forceServerObjectId =
319-
typeof this.s.options.forceServerObjectId === 'boolean'
320-
? this.s.options.forceServerObjectId
321-
: this.s.collection.s.db.options.forceServerObjectId;
322-
323-
// Update operations
324-
if (
325-
(op.updateOne && op.updateOne.q) ||
326-
(op.updateMany && op.updateMany.q) ||
327-
(op.replaceOne && op.replaceOne.q)
328-
) {
329-
op[key].multi = op.updateOne || op.replaceOne ? false : true;
330-
return addToOperationsList(this, common.UPDATE, op[key]);
331-
}
332-
333-
// Crud spec update format
334-
if (op.updateOne || op.updateMany || op.replaceOne) {
335-
var multi = op.updateOne || op.replaceOne ? false : true;
336-
var operation = { q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi };
337-
operation.upsert = op[key].upsert ? true : false;
338-
if (op.collation) operation.collation = op.collation;
339-
if (op[key].arrayFilters) operation.arrayFilters = op[key].arrayFilters;
340-
return addToOperationsList(this, common.UPDATE, operation);
341-
}
342-
343-
// Remove operations
344-
if (
345-
op.removeOne ||
346-
op.removeMany ||
347-
(op.deleteOne && op.deleteOne.q) ||
348-
(op.deleteMany && op.deleteMany.q)
349-
) {
350-
op[key].limit = op.removeOne ? 1 : 0;
351-
return addToOperationsList(this, common.REMOVE, op[key]);
352-
}
353-
354-
// Crud spec delete operations, less efficient
355-
if (op.deleteOne || op.deleteMany) {
356-
var limit = op.deleteOne ? 1 : 0;
357-
operation = { q: op[key].filter, limit: limit };
358-
if (op.collation) operation.collation = op.collation;
359-
return addToOperationsList(this, common.REMOVE, operation);
360-
}
361-
362-
// Insert operations
363-
if (op.insertOne && op.insertOne.document == null) {
364-
if (forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
365-
return addToOperationsList(this, common.INSERT, op.insertOne);
366-
} else if (op.insertOne && op.insertOne.document) {
367-
if (forceServerObjectId !== true && op.insertOne.document._id == null)
368-
op.insertOne.document._id = new ObjectID();
369-
return addToOperationsList(this, common.INSERT, op.insertOne.document);
370-
}
371-
372-
if (op.insertMany) {
373-
for (var i = 0; i < op.insertMany.length; i++) {
374-
if (forceServerObjectId !== true && op.insertMany[i]._id == null)
375-
op.insertMany[i]._id = new ObjectID();
376-
addToOperationsList(this, common.INSERT, op.insertMany[i]);
377-
}
378-
379-
return;
380-
}
381-
382-
// No valid type of operation
383-
throw toError(
384-
'bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany'
385-
);
386-
};
387-
388125
/**
389-
* Add a single insert document to the bulk operation
126+
* Execute next write command in a chain
390127
*
391-
* @param {object} doc the document to insert
392-
* @throws {MongoError}
393-
* @return {OrderedBulkOperation}
128+
* @param {OrderedBulkOperation} bulkOperation
129+
* @param {object} options
130+
* @param {function} callback
394131
*/
395-
OrderedBulkOperation.prototype.insert = function(document) {
396-
if (this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null)
397-
document._id = new ObjectID();
398-
return addToOperationsList(this, common.INSERT, document);
399-
};
400-
401-
/**
402-
* Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
403-
*
404-
* @method
405-
* @param {object} selector The selector for the bulk operation.
406-
* @throws {MongoError}
407-
* @return {FindOperatorsOrdered}
408-
*/
409-
OrderedBulkOperation.prototype.find = function(selector) {
410-
if (!selector) {
411-
throw toError('Bulk find operation must specify a selector');
412-
}
413-
414-
// Save a current selector
415-
this.s.currentOp = {
416-
selector: selector
417-
};
418-
419-
return new FindOperatorsOrdered(this);
420-
};
421-
422-
Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
423-
enumerable: true,
424-
get: function() {
425-
return this.s.currentIndex;
426-
}
427-
});
428-
429-
//
430-
// Execute next write command in a chain
431-
var executeCommands = function(self, options, callback) {
432-
if (self.s.batches.length === 0) {
433-
return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
132+
function executeCommands(bulkOperation, options, callback) {
133+
if (bulkOperation.s.batches.length === 0) {
134+
return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
434135
}
435136

436137
// Ordered execution of the command
437-
var batch = self.s.batches.shift();
138+
const batch = bulkOperation.s.batches.shift();
438139

439-
var resultHandler = function(err, result) {
140+
function resultHandler(err, result) {
440141
// Error is a driver related error not a bulk op error, terminate
441142
if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
442143
return handleCallback(callback, err);
@@ -445,185 +146,32 @@ var executeCommands = function(self, options, callback) {
445146
// If we have and error
446147
if (err) err.ok = 0;
447148
if (err instanceof MongoWriteConcernError) {
448-
return handleMongoWriteConcernError(batch, self.s.bulkResult, true, err, callback);
149+
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback);
449150
}
450151

451152
// Merge the results together
452-
const writeResult = new BulkWriteResult(self.s.bulkResult);
453-
const mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
153+
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
154+
const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result);
454155
if (mergeResult != null) {
455156
return handleCallback(callback, null, writeResult);
456157
}
457158

458-
// If we are ordered and have errors and they are
459-
// not all replication errors terminate the operation
460-
if (self.s.bulkResult.writeErrors.length > 0) {
461-
if (self.s.bulkResult.writeErrors.length === 1) {
462-
return handleCallback(
463-
callback,
464-
new BulkWriteError(toError(self.s.bulkResult.writeErrors[0]), writeResult),
465-
null
466-
);
467-
}
468-
469-
return handleCallback(
470-
callback,
471-
new BulkWriteError(
472-
toError({
473-
message: 'write operation failed',
474-
code: self.s.bulkResult.writeErrors[0].code,
475-
writeErrors: self.s.bulkResult.writeErrors
476-
}),
477-
writeResult
478-
),
479-
null
480-
);
481-
} else if (writeResult.getWriteConcernError()) {
482-
return handleCallback(
483-
callback,
484-
new BulkWriteError(toError(writeResult.getWriteConcernError()), writeResult),
485-
null
486-
);
487-
}
159+
if (bulkOperation.handleWriteError(callback, writeResult)) return;
488160

489161
// Execute the next command in line
490-
executeCommands(self, options, callback);
491-
};
492-
493-
var finalOptions = Object.assign({ ordered: true }, options);
494-
if (self.s.writeConcern != null) {
495-
finalOptions.writeConcern = self.s.writeConcern;
162+
executeCommands(bulkOperation, options, callback);
496163
}
497164

498-
if (finalOptions.bypassDocumentValidation !== true) {
499-
delete finalOptions.bypassDocumentValidation;
500-
}
501-
502-
// Set an operationIf if provided
503-
if (self.operationId) {
504-
resultHandler.operationId = self.operationId;
505-
}
506-
507-
// Serialize functions
508-
if (self.s.options.serializeFunctions) {
509-
finalOptions.serializeFunctions = true;
510-
}
511-
512-
// Ignore undefined
513-
if (self.s.options.ignoreUndefined) {
514-
finalOptions.ignoreUndefined = true;
515-
}
516-
517-
// Is the bypassDocumentValidation options specific
518-
if (self.s.bypassDocumentValidation === true) {
519-
finalOptions.bypassDocumentValidation = true;
520-
}
521-
522-
// Is the checkKeys option disabled
523-
if (self.s.checkKeys === false) {
524-
finalOptions.checkKeys = false;
525-
}
526-
527-
if (finalOptions.retryWrites) {
528-
if (batch.batchType === common.UPDATE) {
529-
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
530-
}
531-
532-
if (batch.batchType === common.REMOVE) {
533-
finalOptions.retryWrites =
534-
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
535-
}
536-
}
537-
538-
try {
539-
if (batch.batchType === common.INSERT) {
540-
self.s.topology.insert(
541-
self.s.collection.namespace,
542-
batch.operations,
543-
finalOptions,
544-
resultHandler
545-
);
546-
} else if (batch.batchType === common.UPDATE) {
547-
self.s.topology.update(
548-
self.s.collection.namespace,
549-
batch.operations,
550-
finalOptions,
551-
resultHandler
552-
);
553-
} else if (batch.batchType === common.REMOVE) {
554-
self.s.topology.remove(
555-
self.s.collection.namespace,
556-
batch.operations,
557-
finalOptions,
558-
resultHandler
559-
);
560-
}
561-
} catch (err) {
562-
// Force top level error
563-
err.ok = 0;
564-
// Merge top level error and return
565-
handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
566-
}
567-
};
568-
569-
/**
570-
* The callback format for results
571-
* @callback OrderedBulkOperation~resultCallback
572-
* @param {MongoError} error An error instance representing the error during the execution.
573-
* @param {BulkWriteResult} result The bulk write result.
574-
*/
575-
576-
/**
577-
* Execute the ordered bulk operation
578-
*
579-
* @method
580-
* @param {object} [options] Optional settings.
581-
* @param {(number|string)} [options.w] The write concern.
582-
* @param {number} [options.wtimeout] The write concern timeout.
583-
* @param {boolean} [options.j=false] Specify a journal write concern.
584-
* @param {boolean} [options.fsync=false] Specify a file sync write concern.
585-
* @param {OrderedBulkOperation~resultCallback} [callback] The result callback
586-
* @throws {MongoError}
587-
* @return {Promise} returns Promise if no callback passed
588-
*/
589-
OrderedBulkOperation.prototype.execute = function(_writeConcern, options, callback) {
590-
if (typeof options === 'function') (callback = options), (options = {});
591-
options = options || {};
592-
593-
if (this.s.executed) {
594-
var executedError = toError('batch cannot be re-executed');
595-
return typeof callback === 'function'
596-
? callback(executedError, null)
597-
: this.s.promiseLibrary.reject(executedError);
598-
}
599-
600-
if (typeof _writeConcern === 'function') {
601-
callback = _writeConcern;
602-
} else if (_writeConcern && typeof _writeConcern === 'object') {
603-
this.s.writeConcern = _writeConcern;
604-
}
605-
606-
// If we have current batch
607-
if (this.s.currentBatch) this.s.batches.push(this.s.currentBatch);
608-
609-
// If we have no operations in the bulk raise an error
610-
if (this.s.batches.length === 0) {
611-
var emptyBatchError = toError('Invalid Operation, no operations specified');
612-
return typeof callback === 'function'
613-
? callback(emptyBatchError, null)
614-
: this.s.promiseLibrary.reject(emptyBatchError);
615-
}
616-
617-
return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
618-
};
165+
bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
166+
}
619167

620168
/**
621169
* Returns an unordered batch object
622170
* @ignore
623171
*/
624-
var initializeOrderedBulkOp = function(topology, collection, options) {
172+
function initializeOrderedBulkOp(topology, collection, options) {
625173
return new OrderedBulkOperation(topology, collection, options);
626-
};
174+
}
627175

628176
initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
629177
module.exports = initializeOrderedBulkOp;

‎lib/bulk/unordered.js

+104-545
Original file line numberDiff line numberDiff line change
@@ -1,451 +1,145 @@
11
'use strict';
22

33
const common = require('./common');
4+
const BulkOperationBase = common.BulkOperationBase;
45
const utils = require('../utils');
5-
const toError = require('../utils').toError;
6-
const handleCallback = require('../utils').handleCallback;
7-
const shallowClone = utils.shallowClone;
6+
const toError = utils.toError;
7+
const handleCallback = utils.handleCallback;
88
const BulkWriteResult = common.BulkWriteResult;
9-
const ObjectID = require('mongodb-core').BSON.ObjectID;
10-
const BSON = require('mongodb-core').BSON;
119
const Batch = common.Batch;
1210
const mergeBatchResults = common.mergeBatchResults;
1311
const executeOperation = utils.executeOperation;
14-
const BulkWriteError = require('./common').BulkWriteError;
15-
const applyWriteConcern = utils.applyWriteConcern;
1612
const MongoWriteConcernError = require('mongodb-core').MongoWriteConcernError;
1713
const handleMongoWriteConcernError = require('./common').handleMongoWriteConcernError;
18-
19-
var bson = new BSON([
20-
BSON.Binary,
21-
BSON.Code,
22-
BSON.DBRef,
23-
BSON.Decimal128,
24-
BSON.Double,
25-
BSON.Int32,
26-
BSON.Long,
27-
BSON.Map,
28-
BSON.MaxKey,
29-
BSON.MinKey,
30-
BSON.ObjectId,
31-
BSON.BSONRegExp,
32-
BSON.Symbol,
33-
BSON.Timestamp
34-
]);
35-
36-
/**
37-
* Create a FindOperatorsUnordered instance (INTERNAL TYPE, do not instantiate directly)
38-
* @class
39-
* @property {number} length Get the number of operations in the bulk.
40-
* @return {FindOperatorsUnordered} a FindOperatorsUnordered instance.
41-
*/
42-
var FindOperatorsUnordered = function(self) {
43-
this.s = self.s;
44-
};
14+
const bson = common.bson;
4515

4616
/**
47-
* Add a single update document to the bulk operation
17+
* Add to internal list of Operations
4818
*
49-
* @method
50-
* @param {object} updateDocument update operations
51-
* @throws {MongoError}
52-
* @return {FindOperatorsUnordered}
53-
*/
54-
FindOperatorsUnordered.prototype.update = function(updateDocument) {
55-
// Perform upsert
56-
var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
57-
58-
// Establish the update command
59-
var document = {
60-
q: this.s.currentOp.selector,
61-
u: updateDocument,
62-
multi: true,
63-
upsert: upsert
64-
};
65-
66-
// Clear out current Op
67-
this.s.currentOp = null;
68-
// Add the update document to the list
69-
return addToOperationsList(this, common.UPDATE, document);
70-
};
71-
72-
/**
73-
* Add a single update one document to the bulk operation
74-
*
75-
* @method
76-
* @param {object} updateDocument update operations
77-
* @throws {MongoError}
78-
* @return {FindOperatorsUnordered}
79-
*/
80-
FindOperatorsUnordered.prototype.updateOne = function(updateDocument) {
81-
// Perform upsert
82-
var upsert = typeof this.s.currentOp.upsert === 'boolean' ? this.s.currentOp.upsert : false;
83-
84-
// Establish the update command
85-
var document = {
86-
q: this.s.currentOp.selector,
87-
u: updateDocument,
88-
multi: false,
89-
upsert: upsert
90-
};
91-
92-
// Clear out current Op
93-
this.s.currentOp = null;
94-
// Add the update document to the list
95-
return addToOperationsList(this, common.UPDATE, document);
96-
};
97-
98-
/**
99-
* Add a replace one operation to the bulk operation
100-
*
101-
* @method
102-
* @param {object} updateDocument the new document to replace the existing one with
103-
* @throws {MongoError}
104-
* @return {FindOperatorsUnordered}
105-
*/
106-
FindOperatorsUnordered.prototype.replaceOne = function(updateDocument) {
107-
this.updateOne(updateDocument);
108-
};
109-
110-
/**
111-
* Upsert modifier for update bulk operation
112-
*
113-
* @method
114-
* @throws {MongoError}
115-
* @return {FindOperatorsUnordered}
116-
*/
117-
FindOperatorsUnordered.prototype.upsert = function() {
118-
this.s.currentOp.upsert = true;
119-
return this;
120-
};
121-
122-
/**
123-
* Add a remove one operation to the bulk operation
124-
*
125-
* @method
126-
* @throws {MongoError}
127-
* @return {FindOperatorsUnordered}
128-
*/
129-
FindOperatorsUnordered.prototype.removeOne = function() {
130-
// Establish the update command
131-
var document = {
132-
q: this.s.currentOp.selector,
133-
limit: 1
134-
};
135-
136-
// Clear out current Op
137-
this.s.currentOp = null;
138-
// Add the remove document to the list
139-
return addToOperationsList(this, common.REMOVE, document);
140-
};
141-
142-
/**
143-
* Add a remove operation to the bulk operation
144-
*
145-
* @method
146-
* @throws {MongoError}
147-
* @return {FindOperatorsUnordered}
19+
* @param {UnorderedBulkOperation} bulkOperation
20+
* @param {number} docType number indicating the document type
21+
* @param {object} document
22+
* @return {UnorderedBulkOperation}
14823
*/
149-
FindOperatorsUnordered.prototype.remove = function() {
150-
// Establish the update command
151-
var document = {
152-
q: this.s.currentOp.selector,
153-
limit: 0
154-
};
155-
156-
// Clear out current Op
157-
this.s.currentOp = null;
158-
// Add the remove document to the list
159-
return addToOperationsList(this, common.REMOVE, document);
160-
};
161-
162-
//
163-
// Add to the operations list
164-
//
165-
var addToOperationsList = function(_self, docType, document) {
24+
function addToOperationsList(bulkOperation, docType, document) {
16625
// Get the bsonSize
167-
var bsonSize = bson.calculateObjectSize(document, {
26+
const bsonSize = bson.calculateObjectSize(document, {
16827
checkKeys: false
16928
});
17029
// Throw error if the doc is bigger than the max BSON size
171-
if (bsonSize >= _self.s.maxBatchSizeBytes)
172-
throw toError('document is larger than the maximum size ' + _self.s.maxBatchSizeBytes);
30+
if (bsonSize >= bulkOperation.s.maxBatchSizeBytes)
31+
throw toError('document is larger than the maximum size ' + bulkOperation.s.maxBatchSizeBytes);
17332
// Holds the current batch
174-
_self.s.currentBatch = null;
33+
bulkOperation.s.currentBatch = null;
17534
// Get the right type of batch
17635
if (docType === common.INSERT) {
177-
_self.s.currentBatch = _self.s.currentInsertBatch;
36+
bulkOperation.s.currentBatch = bulkOperation.s.currentInsertBatch;
17837
} else if (docType === common.UPDATE) {
179-
_self.s.currentBatch = _self.s.currentUpdateBatch;
38+
bulkOperation.s.currentBatch = bulkOperation.s.currentUpdateBatch;
18039
} else if (docType === common.REMOVE) {
181-
_self.s.currentBatch = _self.s.currentRemoveBatch;
40+
bulkOperation.s.currentBatch = bulkOperation.s.currentRemoveBatch;
18241
}
18342

18443
// Create a new batch object if we don't have a current one
185-
if (_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
44+
if (bulkOperation.s.currentBatch == null)
45+
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
18646

18747
// Check if we need to create a new batch
18848
if (
189-
_self.s.currentBatch.size + 1 >= _self.s.maxWriteBatchSize ||
190-
_self.s.currentBatch.sizeBytes + bsonSize >= _self.s.maxBatchSizeBytes ||
191-
_self.s.currentBatch.batchType !== docType
49+
bulkOperation.s.currentBatch.size + 1 >= bulkOperation.s.maxWriteBatchSize ||
50+
bulkOperation.s.currentBatch.sizeBytes + bsonSize >= bulkOperation.s.maxBatchSizeBytes ||
51+
bulkOperation.s.currentBatch.batchType !== docType
19252
) {
19353
// Save the batch to the execution stack
194-
_self.s.batches.push(_self.s.currentBatch);
54+
bulkOperation.s.batches.push(bulkOperation.s.currentBatch);
19555

19656
// Create a new batch
197-
_self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
57+
bulkOperation.s.currentBatch = new Batch(docType, bulkOperation.s.currentIndex);
19858
}
19959

20060
// We have an array of documents
20161
if (Array.isArray(document)) {
20262
throw toError('operation passed in cannot be an Array');
20363
} else {
204-
_self.s.currentBatch.operations.push(document);
205-
_self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
206-
_self.s.currentIndex = _self.s.currentIndex + 1;
64+
bulkOperation.s.currentBatch.operations.push(document);
65+
bulkOperation.s.currentBatch.originalIndexes.push(bulkOperation.s.currentIndex);
66+
bulkOperation.s.currentIndex = bulkOperation.s.currentIndex + 1;
20767
}
20868

20969
// Save back the current Batch to the right type
21070
if (docType === common.INSERT) {
211-
_self.s.currentInsertBatch = _self.s.currentBatch;
212-
_self.s.bulkResult.insertedIds.push({
213-
index: _self.s.bulkResult.insertedIds.length,
71+
bulkOperation.s.currentInsertBatch = bulkOperation.s.currentBatch;
72+
bulkOperation.s.bulkResult.insertedIds.push({
73+
index: bulkOperation.s.bulkResult.insertedIds.length,
21474
_id: document._id
21575
});
21676
} else if (docType === common.UPDATE) {
217-
_self.s.currentUpdateBatch = _self.s.currentBatch;
77+
bulkOperation.s.currentUpdateBatch = bulkOperation.s.currentBatch;
21878
} else if (docType === common.REMOVE) {
219-
_self.s.currentRemoveBatch = _self.s.currentBatch;
79+
bulkOperation.s.currentRemoveBatch = bulkOperation.s.currentBatch;
22080
}
22181

22282
// Update current batch size
223-
_self.s.currentBatch.size = _self.s.currentBatch.size + 1;
224-
_self.s.currentBatch.sizeBytes = _self.s.currentBatch.sizeBytes + bsonSize;
83+
bulkOperation.s.currentBatch.size = bulkOperation.s.currentBatch.size + 1;
84+
bulkOperation.s.currentBatch.sizeBytes = bulkOperation.s.currentBatch.sizeBytes + bsonSize;
22585

226-
// Return self
227-
return _self;
228-
};
86+
// Return bulkOperation
87+
return bulkOperation;
88+
}
22989

23090
/**
23191
* Create a new UnorderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
23292
* @class
23393
* @property {number} length Get the number of operations in the bulk.
23494
* @return {UnorderedBulkOperation} a UnorderedBulkOperation instance.
23595
*/
236-
var UnorderedBulkOperation = function(topology, collection, options) {
237-
options = options == null ? {} : options;
238-
239-
// Get the namesspace for the write operations
240-
var namespace = collection.collectionName;
241-
// Used to mark operation as executed
242-
var executed = false;
243-
244-
// Current item
245-
// var currentBatch = null;
246-
var currentOp = null;
247-
248-
// Handle to the bson serializer, used to calculate running sizes
249-
var bson = topology.bson;
250-
251-
// Set max byte size
252-
var maxBatchSizeBytes =
253-
topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
254-
? topology.isMasterDoc.maxBsonObjectSize
255-
: 1024 * 1025 * 16;
256-
var maxWriteBatchSize =
257-
topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
258-
? topology.isMasterDoc.maxWriteBatchSize
259-
: 1000;
260-
261-
// Get the write concern
262-
var writeConcern = applyWriteConcern(shallowClone(options), { collection: collection }, options);
263-
writeConcern = writeConcern.writeConcern;
264-
265-
// Get the promiseLibrary
266-
var promiseLibrary = options.promiseLibrary || Promise;
267-
268-
// Final results
269-
var bulkResult = {
270-
ok: 1,
271-
writeErrors: [],
272-
writeConcernErrors: [],
273-
insertedIds: [],
274-
nInserted: 0,
275-
nUpserted: 0,
276-
nMatched: 0,
277-
nModified: 0,
278-
nRemoved: 0,
279-
upserted: []
280-
};
281-
282-
// Internal state
283-
this.s = {
284-
// Final result
285-
bulkResult: bulkResult,
286-
// Current batch state
287-
currentInsertBatch: null,
288-
currentUpdateBatch: null,
289-
currentRemoveBatch: null,
290-
currentBatch: null,
291-
currentIndex: 0,
292-
batches: [],
293-
// Write concern
294-
writeConcern: writeConcern,
295-
// Max batch size options
296-
maxBatchSizeBytes: maxBatchSizeBytes,
297-
maxWriteBatchSize: maxWriteBatchSize,
298-
// Namespace
299-
namespace: namespace,
300-
// BSON
301-
bson: bson,
302-
// Topology
303-
topology: topology,
304-
// Options
305-
options: options,
306-
// Current operation
307-
currentOp: currentOp,
308-
// Executed
309-
executed: executed,
310-
// Collection
311-
collection: collection,
312-
// Promise Library
313-
promiseLibrary: promiseLibrary,
314-
// check keys
315-
checkKeys: typeof options.checkKeys === 'boolean' ? options.checkKeys : true
316-
};
317-
// bypass Validation
318-
if (options.bypassDocumentValidation === true) {
319-
this.s.bypassDocumentValidation = true;
320-
}
321-
};
322-
323-
/**
324-
* Add a single insert document to the bulk operation
325-
*
326-
* @param {object} document the document to insert
327-
* @throws {MongoError}
328-
* @return {UnorderedBulkOperation}
329-
*/
330-
UnorderedBulkOperation.prototype.insert = function(document) {
331-
if (this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null)
332-
document._id = new ObjectID();
333-
return addToOperationsList(this, common.INSERT, document);
334-
};
96+
class UnorderedBulkOperation extends BulkOperationBase {
97+
constructor(topology, collection, options) {
98+
options = options || {};
99+
options = Object.assign(options, { addToOperationsList });
100+
101+
super(topology, collection, options, false);
102+
}
103+
104+
/**
105+
* The callback format for results
106+
* @callback UnorderedBulkOperation~resultCallback
107+
* @param {MongoError} error An error instance representing the error during the execution.
108+
* @param {BulkWriteResult} result The bulk write result.
109+
*/
110+
111+
/**
112+
* Execute the ordered bulk operation
113+
*
114+
* @method
115+
* @param {object} [options] Optional settings.
116+
* @param {(number|string)} [options.w] The write concern.
117+
* @param {number} [options.wtimeout] The write concern timeout.
118+
* @param {boolean} [options.j=false] Specify a journal write concern.
119+
* @param {boolean} [options.fsync=false] Specify a file sync write concern.
120+
* @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
121+
* @throws {MongoError}
122+
* @return {Promise} returns Promise if no callback passed
123+
*/
124+
execute(_writeConcern, options, callback) {
125+
const ret = this.bulkExecute(_writeConcern, options, callback);
126+
options = ret.options;
127+
callback = ret.callback;
128+
129+
return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
130+
}
131+
}
335132

336133
/**
337-
* Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
134+
* Execute the command
338135
*
339-
* @method
340-
* @param {object} selector The selector for the bulk operation.
341-
* @throws {MongoError}
342-
* @return {FindOperatorsUnordered}
136+
* @param {UnorderedBulkOperation} bulkOperation
137+
* @param {object} batch
138+
* @param {object} options
139+
* @param {function} callback
343140
*/
344-
UnorderedBulkOperation.prototype.find = function(selector) {
345-
if (!selector) {
346-
throw toError('Bulk find operation must specify a selector');
347-
}
348-
349-
// Save a current selector
350-
this.s.currentOp = {
351-
selector: selector
352-
};
353-
354-
return new FindOperatorsUnordered(this);
355-
};
356-
357-
Object.defineProperty(UnorderedBulkOperation.prototype, 'length', {
358-
enumerable: true,
359-
get: function() {
360-
return this.s.currentIndex;
361-
}
362-
});
363-
364-
UnorderedBulkOperation.prototype.raw = function(op) {
365-
var key = Object.keys(op)[0];
366-
367-
// Set up the force server object id
368-
var forceServerObjectId =
369-
typeof this.s.options.forceServerObjectId === 'boolean'
370-
? this.s.options.forceServerObjectId
371-
: this.s.collection.s.db.options.forceServerObjectId;
372-
373-
// Update operations
374-
if (
375-
(op.updateOne && op.updateOne.q) ||
376-
(op.updateMany && op.updateMany.q) ||
377-
(op.replaceOne && op.replaceOne.q)
378-
) {
379-
op[key].multi = op.updateOne || op.replaceOne ? false : true;
380-
return addToOperationsList(this, common.UPDATE, op[key]);
381-
}
382-
383-
// Crud spec update format
384-
if (op.updateOne || op.updateMany || op.replaceOne) {
385-
var multi = op.updateOne || op.replaceOne ? false : true;
386-
var operation = { q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi };
387-
if (op[key].upsert) operation.upsert = true;
388-
if (op[key].arrayFilters) operation.arrayFilters = op[key].arrayFilters;
389-
return addToOperationsList(this, common.UPDATE, operation);
390-
}
391-
392-
// Remove operations
393-
if (
394-
op.removeOne ||
395-
op.removeMany ||
396-
(op.deleteOne && op.deleteOne.q) ||
397-
(op.deleteMany && op.deleteMany.q)
398-
) {
399-
op[key].limit = op.removeOne ? 1 : 0;
400-
return addToOperationsList(this, common.REMOVE, op[key]);
401-
}
402-
403-
// Crud spec delete operations, less efficient
404-
if (op.deleteOne || op.deleteMany) {
405-
var limit = op.deleteOne ? 1 : 0;
406-
operation = { q: op[key].filter, limit: limit };
407-
return addToOperationsList(this, common.REMOVE, operation);
408-
}
409-
410-
// Insert operations
411-
if (op.insertOne && op.insertOne.document == null) {
412-
if (forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
413-
return addToOperationsList(this, common.INSERT, op.insertOne);
414-
} else if (op.insertOne && op.insertOne.document) {
415-
if (forceServerObjectId !== true && op.insertOne.document._id == null)
416-
op.insertOne.document._id = new ObjectID();
417-
return addToOperationsList(this, common.INSERT, op.insertOne.document);
418-
}
419-
420-
if (op.insertMany) {
421-
for (var i = 0; i < op.insertMany.length; i++) {
422-
if (forceServerObjectId !== true && op.insertMany[i]._id == null)
423-
op.insertMany[i]._id = new ObjectID();
424-
addToOperationsList(this, common.INSERT, op.insertMany[i]);
425-
}
426-
427-
return;
428-
}
429-
430-
// No valid type of operation
431-
throw toError(
432-
'bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany'
433-
);
434-
};
435-
436-
//
437-
// Execute the command
438-
var executeBatch = function(self, batch, options, callback) {
439-
var finalOptions = Object.assign({ ordered: false }, options);
440-
if (self.s.writeConcern != null) {
441-
finalOptions.writeConcern = self.s.writeConcern;
442-
}
443-
444-
if (finalOptions.bypassDocumentValidation !== true) {
445-
delete finalOptions.bypassDocumentValidation;
446-
}
447-
448-
var resultHandler = function(err, result) {
141+
function executeBatch(bulkOperation, batch, options, callback) {
142+
function resultHandler(err, result) {
449143
// Error is a driver related error not a bulk op error, terminate
450144
if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
451145
return handleCallback(callback, err);
@@ -454,85 +148,30 @@ var executeBatch = function(self, batch, options, callback) {
454148
// If we have and error
455149
if (err) err.ok = 0;
456150
if (err instanceof MongoWriteConcernError) {
457-
return handleMongoWriteConcernError(batch, self.s.bulkResult, false, err, callback);
151+
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, false, err, callback);
458152
}
459-
handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, result));
460-
};
461-
462-
// Set an operationIf if provided
463-
if (self.operationId) {
464-
resultHandler.operationId = self.operationId;
465-
}
466-
467-
// Serialize functions
468-
if (self.s.options.serializeFunctions) {
469-
finalOptions.serializeFunctions = true;
153+
handleCallback(
154+
callback,
155+
null,
156+
mergeBatchResults(false, batch, bulkOperation.s.bulkResult, err, result)
157+
);
470158
}
471159

472-
// Ignore undefined
473-
if (self.s.options.ignoreUndefined) {
474-
finalOptions.ignoreUndefined = true;
475-
}
160+
bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
161+
}
476162

477-
// Is the bypassDocumentValidation options specific
478-
if (self.s.bypassDocumentValidation === true) {
479-
finalOptions.bypassDocumentValidation = true;
480-
}
481-
482-
// Is the checkKeys option disabled
483-
if (self.s.checkKeys === false) {
484-
finalOptions.checkKeys = false;
485-
}
486-
487-
if (finalOptions.retryWrites) {
488-
if (batch.batchType === common.UPDATE) {
489-
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
490-
}
491-
492-
if (batch.batchType === common.REMOVE) {
493-
finalOptions.retryWrites =
494-
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
495-
}
496-
}
497-
498-
try {
499-
if (batch.batchType === common.INSERT) {
500-
self.s.topology.insert(
501-
self.s.collection.namespace,
502-
batch.operations,
503-
finalOptions,
504-
resultHandler
505-
);
506-
} else if (batch.batchType === common.UPDATE) {
507-
self.s.topology.update(
508-
self.s.collection.namespace,
509-
batch.operations,
510-
finalOptions,
511-
resultHandler
512-
);
513-
} else if (batch.batchType === common.REMOVE) {
514-
self.s.topology.remove(
515-
self.s.collection.namespace,
516-
batch.operations,
517-
finalOptions,
518-
resultHandler
519-
);
520-
}
521-
} catch (err) {
522-
// Force top level error
523-
err.ok = 0;
524-
// Merge top level error and return
525-
handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
526-
}
527-
};
528-
529-
//
530-
// Execute all the commands
531-
var executeBatches = function(self, options, callback) {
532-
var numberOfCommandsToExecute = self.s.batches.length;
163+
/**
164+
* Execute all the commands
165+
*
166+
* @param {UnorderedBulkOperation} bulkOperation
167+
* @param {object} options
168+
* @param {function} callback
169+
*/
170+
function executeBatches(bulkOperation, options, callback) {
171+
let numberOfCommandsToExecute = bulkOperation.s.batches.length;
533172
// Execute over all the batches
534-
for (var i = 0; i < self.s.batches.length; i++) {
535-
executeBatch(self, self.s.batches[i], options, function(err) {
173+
for (let i = 0; i < bulkOperation.s.batches.length; i++) {
174+
executeBatch(bulkOperation, bulkOperation.s.batches[i], options, function(err) {
536175
// Count down the number of commands left to execute
537176
numberOfCommandsToExecute = numberOfCommandsToExecute - 1;
538177

@@ -541,102 +180,22 @@ var executeBatches = function(self, options, callback) {
541180
// Driver level error
542181
if (err) return handleCallback(callback, err);
543182

544-
const writeResult = new BulkWriteResult(self.s.bulkResult);
545-
if (self.s.bulkResult.writeErrors.length > 0) {
546-
if (self.s.bulkResult.writeErrors.length === 1) {
547-
return handleCallback(
548-
callback,
549-
new BulkWriteError(toError(self.s.bulkResult.writeErrors[0]), writeResult),
550-
null
551-
);
552-
}
553-
554-
return handleCallback(
555-
callback,
556-
new BulkWriteError(
557-
toError({
558-
message: 'write operation failed',
559-
code: self.s.bulkResult.writeErrors[0].code,
560-
writeErrors: self.s.bulkResult.writeErrors
561-
}),
562-
writeResult
563-
),
564-
null
565-
);
566-
} else if (writeResult.getWriteConcernError()) {
567-
return handleCallback(
568-
callback,
569-
new BulkWriteError(toError(writeResult.getWriteConcernError()), writeResult),
570-
null
571-
);
572-
}
183+
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
184+
if (bulkOperation.handleWriteError(callback, writeResult)) return;
573185

574186
return handleCallback(callback, null, writeResult);
575187
}
576188
});
577189
}
578-
};
579-
580-
/**
581-
* The callback format for results
582-
* @callback UnorderedBulkOperation~resultCallback
583-
* @param {MongoError} error An error instance representing the error during the execution.
584-
* @param {BulkWriteResult} result The bulk write result.
585-
*/
586-
587-
/**
588-
* Execute the ordered bulk operation
589-
*
590-
* @method
591-
* @param {object} [options] Optional settings.
592-
* @param {(number|string)} [options.w] The write concern.
593-
* @param {number} [options.wtimeout] The write concern timeout.
594-
* @param {boolean} [options.j=false] Specify a journal write concern.
595-
* @param {boolean} [options.fsync=false] Specify a file sync write concern.
596-
* @param {UnorderedBulkOperation~resultCallback} [callback] The result callback
597-
* @throws {MongoError}
598-
* @return {Promise} returns Promise if no callback passed
599-
*/
600-
UnorderedBulkOperation.prototype.execute = function(_writeConcern, options, callback) {
601-
if (typeof options === 'function') (callback = options), (options = {});
602-
options = options || {};
603-
604-
if (this.s.executed) {
605-
var executedError = toError('batch cannot be re-executed');
606-
return typeof callback === 'function'
607-
? callback(executedError, null)
608-
: this.s.promiseLibrary.reject(executedError);
609-
}
610-
611-
if (typeof _writeConcern === 'function') {
612-
callback = _writeConcern;
613-
} else if (_writeConcern && typeof _writeConcern === 'object') {
614-
this.s.writeConcern = _writeConcern;
615-
}
616-
617-
// If we have current batch
618-
if (this.s.currentInsertBatch) this.s.batches.push(this.s.currentInsertBatch);
619-
if (this.s.currentUpdateBatch) this.s.batches.push(this.s.currentUpdateBatch);
620-
if (this.s.currentRemoveBatch) this.s.batches.push(this.s.currentRemoveBatch);
621-
622-
// If we have no operations in the bulk raise an error
623-
if (this.s.batches.length === 0) {
624-
var emptyBatchError = toError('Invalid Operation, no operations specified');
625-
return typeof callback === 'function'
626-
? callback(emptyBatchError, null)
627-
: this.s.promiseLibrary.reject(emptyBatchError);
628-
}
629-
630-
return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
631-
};
190+
}
632191

633192
/**
634193
* Returns an unordered batch object
635194
* @ignore
636195
*/
637-
var initializeUnorderedBulkOp = function(topology, collection, options) {
196+
function initializeUnorderedBulkOp(topology, collection, options) {
638197
return new UnorderedBulkOperation(topology, collection, options);
639-
};
198+
}
640199

641200
initializeUnorderedBulkOp.UnorderedBulkOperation = UnorderedBulkOperation;
642201
module.exports = initializeUnorderedBulkOp;

0 commit comments

Comments
 (0)
Please sign in to comment.