@@ -15,12 +15,15 @@ import {
15
15
flatClone ,
16
16
PROMISE_RESOLVE_NULL ,
17
17
PROMISE_RESOLVE_VOID ,
18
- ensureNotFalsy
18
+ ensureNotFalsy ,
19
+ parseRevision ,
20
+ createRevision ,
21
+ promiseWait
19
22
} from './util' ;
20
23
import {
21
24
newRxError ,
22
25
newRxTypeError ,
23
- isPouchdbConflictError
26
+ isBulkWriteConflictError
24
27
} from './rx-error' ;
25
28
import {
26
29
runPluginHooks
@@ -321,34 +324,49 @@ export const basePrototype = {
321
324
*/
322
325
atomicUpdate ( this : RxDocument , mutationFunction : Function ) : Promise < RxDocument > {
323
326
return new Promise ( ( res , rej ) => {
324
- this . _atomicQueue = this . _atomicQueue
327
+ this . _atomicQueue = this
328
+ . _atomicQueue
325
329
. then ( async ( ) => {
326
330
let done = false ;
327
331
// we need a hacky while loop to stay incide the chain-link of _atomicQueue
328
332
// while still having the option to run a retry on conflicts
329
333
while ( ! done ) {
330
334
const oldData = this . _dataSync$ . getValue ( ) ;
335
+ // always await because mutationFunction might be async
336
+ let newData ;
337
+
331
338
try {
332
- // always await because mutationFunction might be async
333
- let newData = await mutationFunction ( clone ( this . _dataSync$ . getValue ( ) ) , this ) ;
339
+ newData = await mutationFunction (
340
+ clone ( oldData ) ,
341
+ this
342
+ ) ;
334
343
if ( this . collection ) {
335
344
newData = this . collection . schema . fillObjectWithDefaults ( newData ) ;
336
345
}
346
+ } catch ( err ) {
347
+ rej ( err ) ;
348
+ return ;
349
+ }
337
350
351
+ try {
338
352
await this . _saveData ( newData , oldData ) ;
339
353
done = true ;
340
- } catch ( err ) {
354
+ } catch ( err : any ) {
355
+ const useError = err . parameters && err . parameters . error ? err . parameters . error : err ;
341
356
/**
342
357
* conflicts cannot happen by just using RxDB in one process
343
358
* There are two ways they still can appear which is
344
359
* replication and multi-tab usage
345
360
* Because atomicUpdate has a mutation function,
346
361
* we can just re-run the mutation until there is no conflict
347
362
*/
348
- if ( isPouchdbConflictError ( err as any ) ) {
349
- // pouchdb conflict error -> retrying
363
+ const isConflict = isBulkWriteConflictError ( useError as any ) ;
364
+ if ( isConflict ) {
365
+ // conflict error -> retrying
366
+ newData . _rev = createRevision ( newData , isConflict . documentInDb ) ;
367
+ await promiseWait ( 300 ) ;
350
368
} else {
351
- rej ( err ) ;
369
+ rej ( useError ) ;
352
370
return ;
353
371
}
354
372
}
@@ -385,7 +403,6 @@ export const basePrototype = {
385
403
newData : RxDocumentWriteData < RxDocumentType > ,
386
404
oldData : RxDocumentData < RxDocumentType >
387
405
) : Promise < void > {
388
- newData = newData ;
389
406
390
407
// deleted documents cannot be changed
391
408
if ( this . _isDeleted$ . getValue ( ) ) {
@@ -400,15 +417,22 @@ export const basePrototype = {
400
417
await this . collection . _runHooks ( 'pre' , 'save' , newData , this ) ;
401
418
this . collection . schema . validate ( newData ) ;
402
419
420
+
421
+ // TODO REMOVE THIS CHECK
422
+ const p1 = parseRevision ( oldData . _rev ) ;
423
+ const p2 = parseRevision ( newData . _rev ) ;
424
+ newData . _rev = createRevision ( newData , oldData ) ;
425
+ if ( ( p1 . height + 1 !== p2 . height ) ) {
426
+ // throw new Error('REVISION NOT INCREMENTED! ' + p1.height + ' ' + p2.height);
427
+ }
428
+
403
429
const writeResult = await this . collection . storageInstance . bulkWrite ( [ {
404
430
previous : oldData ,
405
431
document : newData
406
432
} ] ) ;
407
433
408
434
const isError = writeResult . error [ this . primary ] ;
409
435
throwIfIsStorageWriteError ( this . collection , this . primary , newData , isError ) ;
410
- ensureNotFalsy ( writeResult . success [ this . primary ] ) ;
411
-
412
436
413
437
return this . collection . _runHooks ( 'post' , 'save' , newData , this ) ;
414
438
} ,
0 commit comments