Skip to content

Commit dfa24ce

Browse files
committed
Merge remote-tracking branch 'origin/main' into configurable-service-limits
2 parents 7f8d223 + cf023a7 commit dfa24ce

File tree

22 files changed

+140
-72
lines changed

22 files changed

+140
-72
lines changed

modules/module-mongodb-storage/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# @powersync/service-module-mongodb-storage
22

3+
## 0.6.2
4+
5+
### Patch Changes
6+
7+
- 0dd746a: Improve intial replication performance for MongoDB by avoiding sessions.
8+
- @powersync/service-core@1.7.2
9+
310
## 0.6.1
411

512
### Patch Changes

modules/module-mongodb-storage/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mongodb-storage",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.6.1",
5+
"version": "0.6.2",
66
"main": "dist/index.js",
77
"license": "FSL-1.1-Apache-2.0",
88
"type": "module",

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ export class MongoBucketBatch
270270
}
271271
}
272272

273-
return resumeBatch;
273+
return resumeBatch?.hasData() ? resumeBatch : null;
274274
}
275275

276276
private saveOperation(

modules/module-mongodb-storage/src/storage/implementation/OperationBatch.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ export class OperationBatch {
4141
return this.batch.length >= MAX_BATCH_COUNT || this.currentSize > MAX_RECORD_BATCH_SIZE;
4242
}
4343

44+
hasData() {
45+
return this.length > 0;
46+
}
47+
4448
/**
4549
*
4650
* @param sizes Map of source key to estimated size of the current_data document, or undefined if current_data is not persisted.

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ export class PersistedBatch {
245245
}
246246

247247
async flush(db: PowerSyncMongo, session: mongo.ClientSession) {
248+
const startAt = performance.now();
248249
if (this.bucketData.length > 0) {
249250
await db.bucket_data.bulkWrite(this.bucketData, {
250251
session,
@@ -267,10 +268,11 @@ export class PersistedBatch {
267268
});
268269
}
269270

271+
const duration = performance.now() - startAt;
270272
logger.info(
271273
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
272274
this.currentData.length
273-
} updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}`
275+
} updates, ${Math.round(this.currentSize / 1024)}kb in ${duration.toFixed(0)}ms. Last op_id: ${this.debugLastOpId}`
274276
);
275277

276278
this.bucketData = [];

modules/module-mongodb/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# @powersync/service-module-mongodb
22

3+
## 0.6.2
4+
5+
### Patch Changes
6+
7+
- 0dd746a: Improve intial replication performance for MongoDB by avoiding sessions.
8+
- @powersync/service-core@1.7.2
9+
310
## 0.6.1
411

512
### Patch Changes

modules/module-mongodb/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mongodb",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.6.1",
5+
"version": "0.6.2",
66
"main": "dist/index.js",
77
"license": "FSL-1.1-Apache-2.0",
88
"type": "module",

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ import {
88
ReplicationAssertionError,
99
ServiceError
1010
} from '@powersync/lib-services-framework';
11-
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
11+
import {
12+
BSON_DESERIALIZE_DATA_OPTIONS,
13+
Metrics,
14+
SaveOperationTag,
15+
SourceEntityDescriptor,
16+
SourceTable,
17+
storage
18+
} from '@powersync/service-core';
1219
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
1320
import { MongoLSN } from '../common/MongoLSN.js';
1421
import { PostImagesOption } from '../types/types.js';
@@ -193,39 +200,31 @@ export class ChangeStream {
193200
// Not known where this would happen apart from the above cases
194201
throw new ReplicationAssertionError('MongoDB lastWrite timestamp not found.');
195202
}
196-
// We previously used {snapshot: true} for the snapshot session.
197-
// While it gives nice consistency guarantees, it fails when the
198-
// snapshot takes longer than 5 minutes, due to minSnapshotHistoryWindowInSeconds
199-
// expiring the snapshot.
200-
const session = await this.client.startSession();
201-
try {
202-
await this.storage.startBatch(
203-
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
204-
async (batch) => {
205-
// Start by resolving all tables.
206-
// This checks postImage configuration, and that should fail as
207-
// earlier as possible.
208-
let allSourceTables: SourceTable[] = [];
209-
for (let tablePattern of sourceTables) {
210-
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
211-
allSourceTables.push(...tables);
212-
}
213203

214-
for (let table of allSourceTables) {
215-
await this.snapshotTable(batch, table, session);
216-
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
204+
await this.storage.startBatch(
205+
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
206+
async (batch) => {
207+
// Start by resolving all tables.
208+
// This checks postImage configuration, and that should fail as
209+
// earlier as possible.
210+
let allSourceTables: SourceTable[] = [];
211+
for (let tablePattern of sourceTables) {
212+
const tables = await this.resolveQualifiedTableNames(batch, tablePattern);
213+
allSourceTables.push(...tables);
214+
}
217215

218-
await touch();
219-
}
216+
for (let table of allSourceTables) {
217+
await this.snapshotTable(batch, table);
218+
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
220219

221-
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
222-
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
223-
await batch.commit(lsn);
220+
await touch();
224221
}
225-
);
226-
} finally {
227-
session.endSession();
228-
}
222+
223+
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
224+
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
225+
await batch.commit(lsn);
226+
}
227+
);
229228
}
230229

231230
private async setupCheckpointsCollection() {
@@ -283,48 +282,52 @@ export class ChangeStream {
283282
}
284283
}
285284

286-
private async snapshotTable(
287-
batch: storage.BucketStorageBatch,
288-
table: storage.SourceTable,
289-
session?: mongo.ClientSession
290-
) {
285+
private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) {
291286
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`);
292287
const estimatedCount = await this.estimatedCount(table);
293288
let at = 0;
294-
let lastLogIndex = 0;
295-
296289
const db = this.client.db(table.schema);
297290
const collection = db.collection(table.table);
298-
const query = collection.find({}, { session, readConcern: { level: 'majority' } });
299-
300-
const cursor = query.stream();
301-
302-
for await (let document of cursor) {
303-
if (this.abort_signal.aborted) {
304-
throw new ReplicationAbortedError(`Aborted initial replication`);
305-
}
306-
307-
const record = constructAfterRecord(document);
291+
const cursor = collection.find({}, { batchSize: 6_000, readConcern: 'majority' });
292+
293+
let lastBatch = performance.now();
294+
// hasNext() is the call that triggers fetching of the next batch,
295+
// then we read it with readBufferedDocuments(). This gives us semi-explicit
296+
// control over the fetching of each batch, and avoids a separate promise per document
297+
let hasNextPromise = cursor.hasNext();
298+
while (await hasNextPromise) {
299+
const docBatch = cursor.readBufferedDocuments();
300+
// Pre-fetch next batch, so that we can read and write concurrently
301+
hasNextPromise = cursor.hasNext();
302+
for (let document of docBatch) {
303+
if (this.abort_signal.aborted) {
304+
throw new ReplicationAbortedError(`Aborted initial replication`);
305+
}
308306

309-
// This auto-flushes when the batch reaches its size limit
310-
await batch.save({
311-
tag: SaveOperationTag.INSERT,
312-
sourceTable: table,
313-
before: undefined,
314-
beforeReplicaId: undefined,
315-
after: record,
316-
afterReplicaId: document._id
317-
});
307+
const record = constructAfterRecord(document);
318308

319-
at += 1;
320-
if (at - lastLogIndex >= 5000) {
321-
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
322-
lastLogIndex = at;
309+
// This auto-flushes when the batch reaches its size limit
310+
await batch.save({
311+
tag: SaveOperationTag.INSERT,
312+
sourceTable: table,
313+
before: undefined,
314+
beforeReplicaId: undefined,
315+
after: record,
316+
afterReplicaId: document._id
317+
});
323318
}
324-
Metrics.getInstance().rows_replicated_total.add(1);
325319

320+
at += docBatch.length;
321+
Metrics.getInstance().rows_replicated_total.add(docBatch.length);
322+
const duration = performance.now() - lastBatch;
323+
lastBatch = performance.now();
324+
logger.info(
325+
`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount} in ${duration.toFixed(0)}ms`
326+
);
326327
await touch();
327328
}
329+
// In case the loop was interrupted, make sure we await the last promise.
330+
await hasNextPromise;
328331

329332
await batch.flush();
330333
logger.info(`${this.logPrefix} Replicated ${at} documents for ${table.qualifiedName}`);

modules/module-mysql/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @powersync/service-module-mysql
22

3+
## 0.4.2
4+
5+
### Patch Changes
6+
7+
- @powersync/service-core@1.7.2
8+
39
## 0.4.1
410

511
### Patch Changes

modules/module-mysql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mysql",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.4.1",
5+
"version": "0.4.2",
66
"license": "FSL-1.1-Apache-2.0",
77
"main": "dist/index.js",
88
"type": "module",

modules/module-postgres-storage/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# @powersync/service-module-postgres-storage
22

3+
## 0.4.2
4+
5+
### Patch Changes
6+
7+
- @powersync/service-core@1.7.2
8+
- @powersync/service-core-tests@0.6.1
9+
310
## 0.4.1
411

512
### Patch Changes

modules/module-postgres-storage/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-postgres-storage",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/@types/index.d.ts",
5-
"version": "0.4.1",
5+
"version": "0.4.2",
66
"main": "dist/index.js",
77
"type": "module",
88
"publishConfig": {

modules/module-postgres/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @powersync/service-module-postgres
22

3+
## 0.8.2
4+
5+
### Patch Changes
6+
7+
- @powersync/service-core@1.7.2
8+
39
## 0.8.1
410

511
### Patch Changes

modules/module-postgres/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"publishConfig": {
66
"access": "public"
77
},
8-
"version": "0.8.1",
8+
"version": "0.8.2",
99
"main": "dist/index.js",
1010
"license": "FSL-1.1-Apache-2.0",
1111
"type": "module",

packages/service-core-tests/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @powersync/service-core-tests
22

3+
## 0.6.1
4+
5+
### Patch Changes
6+
7+
- @powersync/service-core@1.7.2
8+
39
## 0.6.0
410

511
### Minor Changes

packages/service-core-tests/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"publishConfig": {
66
"access": "public"
77
},
8-
"version": "0.6.0",
8+
"version": "0.6.1",
99
"main": "dist/index.js",
1010
"license": "FSL-1.1-Apache-2.0",
1111
"type": "module",

packages/service-core/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# @powersync/service-core
22

3+
## 1.7.2
4+
35
## 1.7.1
46

57
### Patch Changes

packages/service-core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"publishConfig": {
66
"access": "public"
77
},
8-
"version": "1.7.1",
8+
"version": "1.7.2",
99
"main": "dist/index.js",
1010
"license": "FSL-1.1-Apache-2.0",
1111
"type": "module",

service/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# @powersync/service-image
22

3+
## 1.7.2
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [0dd746a]
8+
- @powersync/service-module-mongodb-storage@0.6.2
9+
- @powersync/service-module-mongodb@0.6.2
10+
- @powersync/service-module-mysql@0.4.2
11+
- @powersync/service-module-postgres@0.8.2
12+
- @powersync/service-core@1.7.2
13+
- @powersync/service-module-postgres-storage@0.4.2
14+
315
## 1.7.1
416

517
### Patch Changes

service/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@powersync/service-image",
3-
"version": "1.7.1",
3+
"version": "1.7.2",
44
"private": true,
55
"license": "FSL-1.1-Apache-2.0",
66
"type": "module",

test-client/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# test-client
22

3+
## 0.1.28
4+
5+
### Patch Changes
6+
7+
- @powersync/service-core@1.7.2
8+
39
## 0.1.27
410

511
### Patch Changes

test-client/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "test-client",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"private": true,
5-
"version": "0.1.27",
5+
"version": "0.1.28",
66
"main": "dist/index.js",
77
"bin": "dist/bin.js",
88
"license": "Apache-2.0",

0 commit comments

Comments
 (0)