Skip to content

Commit 7239b31

Browse files
committedJun 7, 2022
FIX RxStorage should never emit an eventBulk with an empty events array
1 parent 85e7273 commit 7239b31

6 files changed

+64
-11
lines changed
 

‎CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
<!-- CHANGELOG NEWEST -->
55

6+
- FIX: RxStorage should never emit an eventBulk with an empty events array.
7+
68
<!-- ADD new changes here! -->
79

810
<!-- /CHANGELOG NEWEST -->

‎src/plugins/dexie/rx-storage-instance-dexie.ts

+5-3
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,11 @@ export class RxStorageInstanceDexie<RxDocType> implements RxStorageInstance<
255255
]);
256256
});
257257

258-
const endTime = now();
259-
eventBulk.events.forEach(event => event.endTime = endTime);
260-
this.changes$.next(eventBulk);
258+
if (eventBulk.events.length > 0) {
259+
const endTime = now();
260+
eventBulk.events.forEach(event => event.endTime = endTime);
261+
this.changes$.next(eventBulk);
262+
}
261263

262264
return ret;
263265
}

‎src/plugins/lokijs/rx-storage-instance-loki.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ export class RxStorageInstanceLoki<RxDocType> implements RxStorageInstance<
146146
ret.error[err.documentId] = err;
147147
});
148148
localState.databaseState.saveQueue.addWrite();
149-
this.changes$.next(categorized.eventBulk);
149+
150+
if (categorized.eventBulk.events.length > 0) {
151+
this.changes$.next(categorized.eventBulk);
152+
}
150153

151154
return ret;
152155
}

‎src/plugins/memory/rx-storage-instance-memory.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ export class RxStorageInstanceMemory<RxDocType> implements RxStorageInstance<
143143
);
144144
});
145145

146-
this.changes$.next(categorized.eventBulk);
146+
if (categorized.eventBulk.events.length > 0) {
147+
this.changes$.next(categorized.eventBulk);
148+
}
149+
147150
return Promise.resolve(ret);
148151
}
149152

‎src/plugins/pouchdb/rx-storage-instance-pouch.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,17 @@ export class RxStorageInstancePouch<RxDocType> implements RxStorageInstance<
8585
*/
8686
const emittedEventBulkIds: ObliviousSet<string> = new ObliviousSet(60 * 1000);
8787

88-
const eventSub = emitter.subject.subscribe(async (ev) => {
88+
const eventSub = emitter.subject.subscribe(async (eventBulk) => {
8989
if (
90-
ev.events.length === 0 ||
91-
emittedEventBulkIds.has(ev.id)
90+
eventBulk.events.length === 0 ||
91+
emittedEventBulkIds.has(eventBulk.id)
9292
) {
9393
return;
9494
}
95-
emittedEventBulkIds.add(ev.id);
95+
emittedEventBulkIds.add(eventBulk.id);
9696

9797
// rewrite primaryPath of all events
98-
ev.events.forEach(event => {
98+
eventBulk.events.forEach(event => {
9999
if (event.change.doc) {
100100
event.change.doc = pouchSwapIdToPrimary(
101101
this.primaryPath,
@@ -110,7 +110,7 @@ export class RxStorageInstancePouch<RxDocType> implements RxStorageInstance<
110110
}
111111
});
112112

113-
this.changes$.next(ev);
113+
this.changes$.next(eventBulk);
114114
});
115115
this.subs.push(eventSub);
116116
}

‎test/unit/rx-storage-implementations.test.ts

+43
Original file line numberDiff line numberDiff line change
@@ -1687,6 +1687,49 @@ config.parallel('rx-storage-implementations.test.js (implementation: ' + config.
16871687
assert.strictEqual(lastEvent.change.operation, 'DELETE');
16881688
assert.ok(lastEvent.change.previous);
16891689

1690+
sub.unsubscribe();
1691+
storageInstance.close();
1692+
});
1693+
it('it should not emit an empty eventBulk when the write had only errors', async () => {
1694+
const storageInstance = await config.storage.getStorage().createStorageInstance<TestDocType>({
1695+
databaseName: randomCouchString(12),
1696+
collectionName: randomCouchString(12),
1697+
schema: getPseudoSchemaForVersion<TestDocType>(0, 'key'),
1698+
options: {},
1699+
multiInstance: false
1700+
});
1701+
1702+
const emitted: EventBulk<RxStorageChangeEvent<RxDocumentData<TestDocType>>>[] = [];
1703+
const sub = storageInstance.changeStream().subscribe(x => {
1704+
emitted.push(x);
1705+
});
1706+
1707+
const writeData = {
1708+
key: 'foobar',
1709+
value: 'one',
1710+
_rev: EXAMPLE_REVISION_1,
1711+
_deleted: false,
1712+
_attachments: {},
1713+
_meta: {
1714+
lwt: now()
1715+
}
1716+
};
1717+
1718+
// insert
1719+
const firstWriteResult = await storageInstance.bulkWrite([{
1720+
document: writeData
1721+
}]);
1722+
assert.deepStrictEqual(firstWriteResult.error, {});
1723+
1724+
// insert again to cause conflict error
1725+
const secondWriteResult = await storageInstance.bulkWrite([{
1726+
document: writeData
1727+
}]);
1728+
assert.deepStrictEqual(secondWriteResult.success, {});
1729+
1730+
assert.strictEqual(emitted.length, 1);
1731+
assert.strictEqual(emitted[0].events.length, 1);
1732+
16901733
sub.unsubscribe();
16911734
storageInstance.close();
16921735
});

0 commit comments

Comments
 (0)
Please sign in to comment.