Skip to content

Commit 1f624bd

Browse files
authoredJun 15, 2022
WIP Feature/stream replication (#3835)
* ADD stream replication step1 * ADD Stream replication step2 * FIX storage replication tests * FIX tests * FIX types * FIX types * TRY fix timeout * FIX condition * REMOVE logs * FIX lint * REMOVE logs * FIX test lwt * FIX test * FIX test * FIX randomly failing test * FIX stream replication has wrong conflict handling * FUX stream replication rename and fix stuff * IMPROVE logs * FIX bulkWrite must never be called with empty array * FIX empty write * IMPROVE logs * ADD logs * FIX missing update of lwt * IMPROVE logs * IMPROVE logs * FIX stuff * FIX timeouts * ADD log write amount * FIX PouchDB stream replication * FIX remove exit * FIX timeout * FIX improve write performance * FIX fast tests * REMOVE logs
1 parent 0f7c29f commit 1f624bd

17 files changed

+1514
-108
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- FIX: RxStorage should never emit an eventBulk with an empty events array.
88
- Update PouchDB to `7.3.0` Thanks [@cetsupport](https://github.com/cetsupport).
99
- CHANGE (RxStorage) revision hash must not include the `_meta` field.
10+
- Added new Stream replication for internal usage in plugins.
1011

1112
<!-- ADD new changes here! -->
1213

‎docs-src/rx-storage-dexie.md

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# RxStorage Dexie.js
22

3-
Instead of using PouchDB as underlying storage engine, you can also use [Dexie.js](https://github.com/dexie/Dexie.js).
4-
Dexie.js is a minimal wrapper around IndexedDB that has a good performance.
3+
To store the data inside of IndexedDB in the browser, you can also use the [Dexie.js](https://github.com/dexie/Dexie.js) [RxStorage](./rx-storage.md).
54

6-
For the Dexie based `RxStorage`, we use the [mingo](https://github.com/kofrasa/mingo) query handler. And a copy of the query planner from the [PouchDB-find](https://github.com/pouchdb/pouchdb/tree/master/packages/node_modules/pouchdb-find) plugin.
5+
Dexie.js is a minimal wrapper around IndexedDB that has a good performance.
6+
For the Dexie based `RxStorage`, we use the [mingo](https://github.com/kofrasa/mingo) query handler.
77

88
**IMPORTANT:** The Dexie.js `RxStorage` is in **beta** mode. It may get breaking changes in any minor new RxDB version. Use at your own risk.
99

@@ -15,8 +15,7 @@ For the Dexie based `RxStorage`, we use the [mingo](https://github.com/kofrasa/m
1515
## Cons
1616
- Does not support CouchDB replication.
1717
- It does not support attachments. (Make a pull request)
18-
- Running many operations can be slow because the underlying [IndexedDB is slow](./slow-indexeddb.md).
19-
18+
- Does not use a [Batched Cursor](./slow-indexeddb.md) which makes it slower then the [IndexedDB RxStorage](./rx-storage-indexeddb.md).
2019

2120
## Usage
2221

‎package.json

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
"test:node:dexie": "npm run transpile && cross-env DEFAULT_STORAGE=dexie mocha --expose-gc --config ./config/.mocharc.js ./test_tmp/unit.test.js",
5454
"test:node:pouchdb:loop": "npm run test:node:pouchdb && npm run test:node:pouchdb:loop",
5555
"test:node:lokijs:loop": "npm run test:node:lokijs && npm run test:node:lokijs:loop",
56+
"test:node:dexie:loop": "npm run test:node:dexie && npm run test:node:dexie:loop",
5657
"test:node:custom": "npm run transpile && cross-env DEFAULT_STORAGE=custom mocha --expose-gc --config ./config/.mocharc.js ./test_tmp/unit.test.js",
5758
"// test:node:loop": "runs tests in node in a loop. Use this to debug tests that only fail sometimes",
5859
"test:node:loop": "npm run test:node && npm run test:node:loop",

‎src/index.ts

+10
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ export {
7979

8080
export * from './rx-storage-helper';
8181

82+
export * from './rx-storage-replication';
83+
8284
export * from './custom-index';
8385
export * from './query-planner';
8486

@@ -155,6 +157,14 @@ export type {
155157
RxStorageChangeEvent,
156158
RxStorageInstance,
157159

160+
// stuff from the RxStorage replication
161+
RxStorageInstanceReplicationInput,
162+
RxStorageInstanceReplicationState,
163+
RxConflictHandler,
164+
RxConflictHandlerInput,
165+
RxStorageReplicationDirection,
166+
167+
// other stuff
158168
RxDumpCollectionBase,
159169
RxDumpDatabaseAny,
160170
RxDumpDatabaseBase,

‎src/plugins/memory/rx-storage-instance-memory.ts

-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ export class RxStorageInstanceMemory<RxDocType> implements RxStorageInstance<
146146
if (categorized.eventBulk.events.length > 0) {
147147
this.changes$.next(categorized.eventBulk);
148148
}
149-
150149
return Promise.resolve(ret);
151150
}
152151

‎src/plugins/pouchdb/rx-storage-instance-pouch.ts

+92-83
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ export class RxStorageInstancePouch<RxDocType> implements RxStorageInstance<
5959
private subs: Subscription[] = [];
6060
private primaryPath: StringKeys<RxDocumentData<RxDocType>>;
6161

62+
63+
/**
64+
* Some PouchDB operations give wrong results when they run in parallel.
65+
* So we have to ensure they are queued up.
66+
*/
67+
private nonParallelQueue: Promise<any> = PROMISE_RESOLVE_VOID;
68+
6269
constructor(
6370
public readonly storage: RxStorage<PouchStorageInternals, PouchSettings>,
6471
public readonly databaseName: string,
@@ -173,52 +180,54 @@ export class RxStorageInstancePouch<RxDocType> implements RxStorageInstance<
173180
});
174181

175182
const previousDocsInDb: Map<string, RxDocumentData<any>> = new Map();
176-
const pouchResult = await this.internals.pouch.bulkDocs(writeDocs, {
177-
new_edits: false,
178-
custom: {
179-
primaryPath: this.primaryPath,
180-
writeRowById,
181-
insertDocsById,
182-
previousDocsInDb
183-
}
184-
} as any);
185-
186183
const ret: RxStorageBulkWriteResponse<RxDocType> = {
187184
success: {},
188185
error: {}
189186
};
190-
await Promise.all(
191-
pouchResult.map(async (resultRow) => {
192-
const writeRow = getFromMapOrThrow(writeRowById, resultRow.id);
193-
if ((resultRow as PouchWriteError).error) {
194-
const previousDoc = getFromMapOrThrow(previousDocsInDb, resultRow.id);
195-
const err: RxStorageBulkWriteError<RxDocType> = {
196-
isError: true,
197-
status: 409,
198-
documentId: resultRow.id,
199-
writeRow,
200-
documentInDb: pouchDocumentDataToRxDocumentData(
201-
this.primaryPath,
202-
previousDoc
203-
)
204-
};
205-
ret.error[resultRow.id] = err;
206-
} else {
207-
let pushObj: RxDocumentData<RxDocType> = flatClone(writeRow.document) as any;
208-
pushObj = pouchSwapIdToPrimary(this.primaryPath, pushObj);
209-
pushObj._rev = (resultRow as PouchBulkDocResultRow).rev;
210-
211-
// replace the inserted attachments with their diggest
212-
pushObj._attachments = {};
213-
if (!writeRow.document._attachments) {
214-
writeRow.document._attachments = {};
187+
this.nonParallelQueue = this.nonParallelQueue.then(async () => {
188+
const pouchResult = await this.internals.pouch.bulkDocs(writeDocs, {
189+
new_edits: false,
190+
custom: {
191+
primaryPath: this.primaryPath,
192+
writeRowById,
193+
insertDocsById,
194+
previousDocsInDb
195+
}
196+
} as any);
197+
return Promise.all(
198+
pouchResult.map(async (resultRow) => {
199+
const writeRow = getFromMapOrThrow(writeRowById, resultRow.id);
200+
if ((resultRow as PouchWriteError).error) {
201+
const previousDoc = getFromMapOrThrow(previousDocsInDb, resultRow.id);
202+
const err: RxStorageBulkWriteError<RxDocType> = {
203+
isError: true,
204+
status: 409,
205+
documentId: resultRow.id,
206+
writeRow,
207+
documentInDb: pouchDocumentDataToRxDocumentData(
208+
this.primaryPath,
209+
previousDoc
210+
)
211+
};
212+
ret.error[resultRow.id] = err;
215213
} else {
216-
pushObj._attachments = await writeAttachmentsToAttachments(writeRow.document._attachments);
214+
let pushObj: RxDocumentData<RxDocType> = flatClone(writeRow.document) as any;
215+
pushObj = pouchSwapIdToPrimary(this.primaryPath, pushObj);
216+
pushObj._rev = (resultRow as PouchBulkDocResultRow).rev;
217+
218+
// replace the inserted attachments with their diggest
219+
pushObj._attachments = {};
220+
if (!writeRow.document._attachments) {
221+
writeRow.document._attachments = {};
222+
} else {
223+
pushObj._attachments = await writeAttachmentsToAttachments(writeRow.document._attachments);
224+
}
225+
ret.success[resultRow.id] = pushObj;
217226
}
218-
ret.success[resultRow.id] = pushObj;
219-
}
220-
})
221-
);
227+
})
228+
);
229+
});
230+
await this.nonParallelQueue;
222231
return ret;
223232
}
224233

@@ -261,53 +270,53 @@ export class RxStorageInstancePouch<RxDocType> implements RxStorageInstance<
261270
* @link https://stackoverflow.com/a/63516761/3443137
262271
*/
263272
if (deleted) {
264-
const viaChanges = await this.internals.pouch.changes({
265-
live: false,
266-
since: 0,
267-
doc_ids: ids,
268-
style: 'all_docs'
269-
});
270-
271273
const retDocs: { [documentId: string]: RxDocumentData<RxDocType> } = {};
272-
await Promise.all(
273-
viaChanges.results.map(async (result) => {
274-
const firstDoc = await this.internals.pouch.get(
275-
result.id,
276-
{
277-
rev: result.changes[0].rev,
278-
deleted: 'ok',
279-
style: 'all_docs'
280-
}
281-
);
282-
const useFirstDoc = pouchDocumentDataToRxDocumentData(
283-
this.primaryPath,
284-
firstDoc
285-
);
286-
retDocs[result.id] = useFirstDoc;
287-
})
288-
);
289-
return retDocs;
290-
}
291-
292-
293-
const pouchResult = await this.internals.pouch.allDocs({
294-
include_docs: true,
295-
keys: ids
296-
});
297-
298-
const ret: { [documentId: string]: RxDocumentData<RxDocType> } = {};
299-
pouchResult.rows
300-
.filter(row => !!row.doc)
301-
.forEach(row => {
302-
let docData = row.doc;
303-
docData = pouchDocumentDataToRxDocumentData(
304-
this.primaryPath,
305-
docData
274+
this.nonParallelQueue = this.nonParallelQueue.then(async () => {
275+
const viaChanges = await this.internals.pouch.changes({
276+
live: false,
277+
since: 0,
278+
doc_ids: ids,
279+
style: 'all_docs'
280+
});
281+
await Promise.all(
282+
viaChanges.results.map(async (result) => {
283+
const firstDoc = await this.internals.pouch.get(
284+
result.id,
285+
{
286+
rev: result.changes[0].rev,
287+
deleted: 'ok',
288+
style: 'all_docs'
289+
}
290+
);
291+
const useFirstDoc = pouchDocumentDataToRxDocumentData(
292+
this.primaryPath,
293+
firstDoc
294+
);
295+
retDocs[result.id] = useFirstDoc;
296+
})
306297
);
307-
ret[row.id] = docData;
308298
});
299+
await this.nonParallelQueue;
300+
return retDocs;
301+
} else {
302+
const pouchResult = await this.internals.pouch.allDocs({
303+
include_docs: true,
304+
keys: ids
305+
});
306+
const ret: { [documentId: string]: RxDocumentData<RxDocType> } = {};
307+
pouchResult.rows
308+
.filter(row => !!row.doc)
309+
.forEach(row => {
310+
let docData = row.doc;
311+
docData = pouchDocumentDataToRxDocumentData(
312+
this.primaryPath,
313+
docData
314+
);
315+
ret[row.id] = docData;
316+
});
309317

310-
return ret;
318+
return ret;
319+
}
311320
}
312321

313322
changeStream(): Observable<EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>>> {

‎src/rx-document.ts

-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
PROMISE_RESOLVE_NULL,
1717
PROMISE_RESOLVE_VOID,
1818
ensureNotFalsy,
19-
parseRevision,
2019
createRevision,
2120
promiseWait
2221
} from './util';
@@ -418,14 +417,6 @@ export const basePrototype = {
418417
this.collection.schema.validate(newData);
419418

420419

421-
// TODO REMOVE THIS CHECK
422-
const p1 = parseRevision(oldData._rev);
423-
const p2 = parseRevision(newData._rev);
424-
newData._rev = createRevision(newData, oldData);
425-
if ((p1.height + 1 !== p2.height)) {
426-
// throw new Error('REVISION NOT INCREMENTED! ' + p1.height + ' ' + p2.height);
427-
}
428-
429420
const writeResult = await this.collection.storageInstance.bulkWrite([{
430421
previous: oldData,
431422
document: newData

‎src/rx-query.ts

+1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ export class RxQueryBase<
145145
return false;
146146
}
147147
}),
148+
filter(result => !!result),
148149
/**
149150
* Map the result set to a single RxDocument or an array,
150151
* depending on query type

‎src/rx-storage-helper.ts

+22-6
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,21 @@ export async function writeSingle<RxDocType>(
9292
}
9393
}
9494

95+
9596
export function storageChangeEventToRxChangeEvent<DocType>(
9697
isLocal: boolean,
9798
rxStorageChangeEvent: RxStorageChangeEvent<DocType>,
9899
rxCollection?: RxCollection,
99100
): RxChangeEvent<DocType> {
100101
let documentData;
102+
103+
/**
104+
* TODO
105+
* this data design is shit,
106+
* instead of having the documentData depending on the operation,
107+
* we should always have a current doc data, that might or might not
108+
* have set _deleted to true.
109+
*/
101110
if (rxStorageChangeEvent.change.operation !== 'DELETE') {
102111
documentData = rxStorageChangeEvent.change.doc;
103112
}
@@ -460,7 +469,19 @@ export function stripAttachmentsDataFromDocument<RxDocType>(doc: RxDocumentWrite
460469
return useDoc;
461470
}
462471

463-
472+
/**
473+
* Flat clone the document data
474+
* and also the _meta field.
475+
* Used many times when we want to change the meta
476+
* during replication etc.
477+
*/
478+
export function flatCloneDocWithMeta<RxDocType>(
479+
doc: RxDocumentData<RxDocType>
480+
): RxDocumentData<RxDocType> {
481+
const ret = flatClone(doc);
482+
ret._meta = flatClone(doc._meta);
483+
return ret;
484+
}
464485

465486
/**
466487
* Each event is labeled with the id
@@ -579,11 +600,6 @@ export function getWrappedStorageInstance<RxDocType, Internals, InstanceCreation
579600
runPluginHooks('preWriteToStorageInstance', hookParams);
580601
data = hookParams.doc;
581602

582-
583-
// console.log('----------------------');
584-
// console.dir(writeRow.previous);
585-
// console.dir(data);
586-
587603
/**
588604
* Update the revision after the hooks have run.
589605
* Do not update the revision if no previous is given,

0 commit comments

Comments
 (0)
Please sign in to comment.