Skip to content

Commit 59afb33

Browse files
committed
Merge branch 'main' into mysql-binlog-backpressure-handling
2 parents 079a2f5 + cc44995 commit 59afb33

File tree

34 files changed

+216
-59
lines changed

34 files changed

+216
-59
lines changed

.changeset/beige-clocks-mix.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

.changeset/little-beans-carry.md

Lines changed: 0 additions & 9 deletions
This file was deleted.

modules/module-core/CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# @powersync/service-module-core
22

3+
## 0.1.1
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [100ccec]
8+
- Updated dependencies [b57f938]
9+
- Updated dependencies [5b39039]
10+
- @powersync/service-core@1.12.1
11+
312
## 0.1.0
413

514
### Minor Changes

modules/module-core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-core",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.1.0",
5+
"version": "0.1.1",
66
"main": "dist/index.js",
77
"license": "FSL-1.1-Apache-2.0",
88
"type": "module",

modules/module-mongodb-storage/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# @powersync/service-module-mongodb-storage
22

3+
## 0.9.5
4+
5+
### Patch Changes
6+
7+
- 5b39039: Cleanup on internal sync rules implementation and APIs.
8+
- Updated dependencies [100ccec]
9+
- Updated dependencies [b57f938]
10+
- Updated dependencies [5b39039]
11+
- @powersync/service-core@1.12.1
12+
- @powersync/service-sync-rules@0.27.0
13+
314
## 0.9.4
415

516
### Patch Changes

modules/module-mongodb-storage/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mongodb-storage",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.9.4",
5+
"version": "0.9.5",
66
"main": "dist/index.js",
77
"license": "FSL-1.1-Apache-2.0",
88
"type": "module",

modules/module-mongodb/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# @powersync/service-module-mongodb
22

3+
## 0.9.1
4+
5+
### Patch Changes
6+
7+
- b57f938: [MongoDB] Fix replication batching
8+
- Updated dependencies [100ccec]
9+
- Updated dependencies [b57f938]
10+
- Updated dependencies [5b39039]
11+
- @powersync/service-core@1.12.1
12+
- @powersync/service-sync-rules@0.27.0
13+
314
## 0.9.0
415

516
### Minor Changes

modules/module-mongodb/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mongodb",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.9.0",
5+
"version": "0.9.1",
66
"main": "dist/index.js",
77
"license": "FSL-1.1-Apache-2.0",
88
"type": "module",

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
55
import * as service_types from '@powersync/service-types';
66

77
import { MongoManager } from '../replication/MongoManager.js';
8-
import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js';
8+
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
99
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
1010
import * as types from '../types/types.js';
1111
import { escapeRegExp } from '../utils.js';
@@ -206,10 +206,6 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
206206
return undefined;
207207
}
208208

209-
async getReplicationHead(): Promise<string> {
210-
return createCheckpoint(this.client, this.db);
211-
}
212-
213209
async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
214210
const session = this.client.startSession();
215211
try {
@@ -224,7 +220,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
224220
// Trigger a change on the changestream.
225221
await this.db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
226222
{
227-
_id: 'checkpoint' as any
223+
_id: STANDALONE_CHECKPOINT_ID as any
228224
},
229225
{
230226
$inc: { i: 1 }

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@ import { MongoLSN } from '../common/MongoLSN.js';
1515
import { PostImagesOption } from '../types/types.js';
1616
import { escapeRegExp } from '../utils.js';
1717
import { MongoManager } from './MongoManager.js';
18-
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
18+
import {
19+
constructAfterRecord,
20+
createCheckpoint,
21+
getCacheIdentifier,
22+
getMongoRelation,
23+
STANDALONE_CHECKPOINT_ID
24+
} from './MongoRelation.js';
1925
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
2026

2127
export interface ChangeStreamOptions {
@@ -69,6 +75,8 @@ export class ChangeStream {
6975

7076
private relation_cache = new Map<string | number, storage.SourceTable>();
7177

78+
private checkpointStreamId = new mongo.ObjectId();
79+
7280
constructor(options: ChangeStreamOptions) {
7381
this.storage = options.storage;
7482
this.metrics = options.metrics;
@@ -247,6 +255,11 @@ export class ChangeStream {
247255
await this.defaultDb.createCollection(CHECKPOINTS_COLLECTION, {
248256
changeStreamPreAndPostImages: { enabled: true }
249257
});
258+
} else {
259+
// Clear the collection on startup, to keep it clean
260+
// We never query this collection directly, and don't want to keep the data around.
261+
// We only use this to get data into the oplog/changestream.
262+
await this.defaultDb.collection(CHECKPOINTS_COLLECTION).deleteMany({});
250263
}
251264
}
252265

@@ -434,7 +447,7 @@ export class ChangeStream {
434447
await batch.truncate([result.table]);
435448

436449
await this.snapshotTable(batch, result.table);
437-
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb);
450+
const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID);
438451

439452
const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn);
440453
return table;
@@ -601,7 +614,11 @@ export class ChangeStream {
601614
// Always start with a checkpoint.
602615
// This helps us to clear errors when restarting, even if there is
603616
// no data to replicate.
604-
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);
617+
let waitForCheckpointLsn: string | null = await createCheckpoint(
618+
this.client,
619+
this.defaultDb,
620+
this.checkpointStreamId
621+
);
605622

606623
let splitDocument: mongo.ChangeStreamDocument | null = null;
607624

@@ -700,13 +717,9 @@ export class ChangeStream {
700717
}
701718
}
702719

703-
if (
704-
(changeDocument.operationType == 'insert' ||
705-
changeDocument.operationType == 'update' ||
706-
changeDocument.operationType == 'replace' ||
707-
changeDocument.operationType == 'drop') &&
708-
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
709-
) {
720+
const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined;
721+
722+
if (ns?.coll == CHECKPOINTS_COLLECTION) {
710723
/**
711724
* Dropping the database does not provide an `invalidate` event.
712725
* We typically would receive `drop` events for the collection which we
@@ -727,6 +740,29 @@ export class ChangeStream {
727740
);
728741
}
729742

743+
if (
744+
!(
745+
changeDocument.operationType == 'insert' ||
746+
changeDocument.operationType == 'update' ||
747+
changeDocument.operationType == 'replace'
748+
)
749+
) {
750+
continue;
751+
}
752+
753+
// We handle two types of checkpoint events:
754+
// 1. "Standalone" checkpoints, typically write checkpoints. We want to process these
755+
// immediately, regardless of where they were created.
756+
// 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate
757+
// limiting of commits, so we specifically want to exclude checkpoints from other streams.
758+
//
759+
// It may be useful to also throttle commits due to standalone checkpoints in the future.
760+
// However, these typically have a much lower rate than batch checkpoints, so we don't do that for now.
761+
762+
const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId;
763+
if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) {
764+
continue;
765+
}
730766
const { comparable: lsn } = new MongoLSN({
731767
timestamp: changeDocument.clusterTime!,
732768
resume_token: changeDocument._id
@@ -743,7 +779,7 @@ export class ChangeStream {
743779
changeDocument.operationType == 'delete'
744780
) {
745781
if (waitForCheckpointLsn == null) {
746-
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
782+
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId);
747783
}
748784
const rel = getMongoRelation(changeDocument.ns);
749785
const table = await this.getRelation(batch, rel, {

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,27 @@ function filterJsonData(data: any, depth = 0): any {
147147
}
148148
}
149149

150-
export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise<string> {
150+
/**
151+
* Id for checkpoints not associated with any specific replication stream.
152+
*
153+
* Use this for write checkpoints, or any other case where we want to process
154+
* the checkpoint immediately, and not wait for batching.
155+
*/
156+
export const STANDALONE_CHECKPOINT_ID = '_standalone_checkpoint';
157+
158+
export async function createCheckpoint(
159+
client: mongo.MongoClient,
160+
db: mongo.Db,
161+
id: mongo.ObjectId | string
162+
): Promise<string> {
151163
const session = client.startSession();
152164
try {
153-
// Note: If multiple PowerSync instances are replicating the same source database,
154-
// they'll modify the same checkpoint document. This is fine - it could create
155-
// more replication load than required, but won't break anything.
165+
// We use an unique id per process, and clear documents on startup.
166+
// This is so that we can filter events for our own process only, and ignore
167+
// events from other processes.
156168
await db.collection(CHECKPOINTS_COLLECTION).findOneAndUpdate(
157169
{
158-
_id: 'checkpoint' as any
170+
_id: id as any
159171
},
160172
{
161173
$inc: { i: 1 }

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { METRICS_HELPER, test_utils } from '@powersync/service-core-tests';
1212

1313
import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
1414
import { MongoManager } from '@module/replication/MongoManager.js';
15-
import { createCheckpoint } from '@module/replication/MongoRelation.js';
15+
import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js';
1616
import { NormalizedMongoConnectionConfig } from '@module/types/types.js';
1717

1818
import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js';
@@ -160,7 +160,7 @@ export async function getClientCheckpoint(
160160
options?: { timeout?: number }
161161
): Promise<InternalOpId> {
162162
const start = Date.now();
163-
const lsn = await createCheckpoint(client, db);
163+
const lsn = await createCheckpoint(client, db, STANDALONE_CHECKPOINT_ID);
164164
// This old API needs a persisted checkpoint id.
165165
// Since we don't use LSNs anymore, the only way to get that is to wait.
166166

modules/module-mysql/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# @powersync/service-module-mysql
22

3+
## 0.6.5
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [100ccec]
8+
- Updated dependencies [b57f938]
9+
- Updated dependencies [5b39039]
10+
- @powersync/service-core@1.12.1
11+
- @powersync/service-sync-rules@0.27.0
12+
313
## 0.6.4
414

515
### Patch Changes

modules/module-mysql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-mysql",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/index.d.ts",
5-
"version": "0.6.4",
5+
"version": "0.6.5",
66
"license": "FSL-1.1-Apache-2.0",
77
"main": "dist/index.js",
88
"type": "module",

modules/module-postgres-storage/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# @powersync/service-module-postgres-storage
22

3+
## 0.7.5
4+
5+
### Patch Changes
6+
7+
- 5b39039: Cleanup on internal sync rules implementation and APIs.
8+
- Updated dependencies [100ccec]
9+
- Updated dependencies [b57f938]
10+
- Updated dependencies [5b39039]
11+
- @powersync/service-core@1.12.1
12+
- @powersync/service-sync-rules@0.27.0
13+
- @powersync/service-core-tests@0.9.5
14+
315
## 0.7.4
416

517
### Patch Changes

modules/module-postgres-storage/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@powersync/service-module-postgres-storage",
33
"repository": "https://github.com/powersync-ja/powersync-service",
44
"types": "dist/@types/index.d.ts",
5-
"version": "0.7.4",
5+
"version": "0.7.5",
66
"main": "dist/index.js",
77
"type": "module",
88
"publishConfig": {

modules/module-postgres/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# @powersync/service-module-postgres
22

3+
## 0.13.1
4+
5+
### Patch Changes
6+
7+
- Updated dependencies [100ccec]
8+
- Updated dependencies [b57f938]
9+
- Updated dependencies [5b39039]
10+
- @powersync/service-core@1.12.1
11+
- @powersync/service-sync-rules@0.27.0
12+
313
## 0.13.0
414

515
### Minor Changes

modules/module-postgres/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"publishConfig": {
66
"access": "public"
77
},
8-
"version": "0.13.0",
8+
"version": "0.13.1",
99
"main": "dist/index.js",
1010
"license": "FSL-1.1-Apache-2.0",
1111
"type": "module",

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
252252

253253
const r = await callback(currentLsn);
254254

255+
// Note: This may not reliably trigger a new replication message on Postgres 11 or 12,
256+
// in which case there could be a delay in the client receiving the write checkpoint acknowledgement.
257+
// Postgres 12 already reached EOL, and this is not a critical issue, so we're not fixing it.
258+
// On postgres 13+, this works reliably.
255259
await lib_postgres.retriedQuery(this.pool, KEEPALIVE_STATEMENT);
256260

257261
return r;

0 commit comments

Comments
 (0)