Skip to content

Commit 0d248fb

Browse files
committed
WIP: Record replication progress for Postgres.
1 parent 95b2095 commit 0d248fb

File tree

14 files changed

+230
-38
lines changed

14 files changed

+230
-38
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,32 @@ export class MongoBucketBatch
859859
return last_op!;
860860
}
861861

862+
async updateTableProgress(
863+
table: storage.SourceTable,
864+
progress: Partial<storage.TableSnapshotStatus>
865+
): Promise<storage.SourceTable> {
866+
const copy = table.clone();
867+
copy.snapshotStatus = {
868+
totalEstimatedCount: progress.totalEstimatedCount ?? copy.snapshotStatus?.totalEstimatedCount ?? 0,
869+
replicatedCount: progress.replicatedCount ?? copy.snapshotStatus?.replicatedCount ?? 0,
870+
lastKey: progress.lastKey ?? copy.snapshotStatus?.lastKey ?? null
871+
};
872+
873+
await this.withTransaction(async () => {
874+
await this.db.source_tables.updateOne(
875+
{ _id: table.id },
876+
{
877+
$set: {
878+
snapshot_status: copy.snapshotStatus
879+
}
880+
},
881+
{ session: this.session }
882+
);
883+
});
884+
885+
return copy;
886+
}
887+
862888
async markSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn: string) {
863889
const session = this.session;
864890
const ids = tables.map((table) => table.id);
@@ -869,6 +895,9 @@ export class MongoBucketBatch
869895
{
870896
$set: {
871897
snapshot_done: true
898+
},
899+
$unset: {
900+
snapshot_status: 1
872901
}
873902
},
874903
{ session }
@@ -892,17 +921,8 @@ export class MongoBucketBatch
892921
}
893922
});
894923
return tables.map((table) => {
895-
const copy = new storage.SourceTable(
896-
table.id,
897-
table.connectionTag,
898-
table.objectId,
899-
table.schema,
900-
table.table,
901-
table.replicaIdColumns,
902-
table.snapshotComplete
903-
);
904-
copy.syncData = table.syncData;
905-
copy.syncParameters = table.syncParameters;
924+
const copy = table.clone();
925+
copy.snapshotComplete = true;
906926
return copy;
907927
});
908928
}

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ export class MongoSyncBucketStorage
194194
table_name: table,
195195
replica_id_columns: null,
196196
replica_id_columns2: columns,
197-
snapshot_done: false
197+
snapshot_done: false,
198+
snapshot_status: undefined
198199
};
199200

200201
await col.insertOne(doc, { session });
@@ -211,6 +212,8 @@ export class MongoSyncBucketStorage
211212
sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable);
212213
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);
213214
sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable);
215+
// TODO: standardize serialization format
216+
sourceTable.snapshotStatus = doc.snapshot_status;
214217

215218
let dropTables: storage.SourceTable[] = [];
216219
// Detect tables that are either renamed, or have different replica_id_columns

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { storage } from '@powersync/service-core';
1+
import { storage, TableSnapshotStatus } from '@powersync/service-core';
22
import { SqliteJsonValue } from '@powersync/service-sync-rules';
33
import * as bson from 'bson';
44

@@ -73,6 +73,7 @@ export interface SourceTableDocument {
7373
replica_id_columns: string[] | null;
7474
replica_id_columns2: { name: string; type_oid?: number; type?: string }[] | undefined;
7575
snapshot_done: boolean | undefined;
76+
snapshot_status: TableSnapshotStatus | undefined;
7677
}
7778

7879
/**

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ import {
88
ReplicationAssertionError,
99
ServiceError
1010
} from '@powersync/lib-services-framework';
11-
import { MetricsEngine, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
11+
import {
12+
MetricsEngine,
13+
RelationCache,
14+
SaveOperationTag,
15+
SourceEntityDescriptor,
16+
SourceTable,
17+
storage
18+
} from '@powersync/service-core';
1219
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
1320
import { ReplicationMetric } from '@powersync/service-types';
1421
import { MongoLSN } from '../common/MongoLSN.js';
@@ -73,7 +80,7 @@ export class ChangeStream {
7380

7481
private abort_signal: AbortSignal;
7582

76-
private relation_cache = new Map<string | number, storage.SourceTable>();
83+
private relationCache = new RelationCache(getCacheIdentifier);
7784

7885
private checkpointStreamId = new mongo.ObjectId();
7986

@@ -178,11 +185,14 @@ export class ChangeStream {
178185
}
179186

180187
async estimatedCount(table: storage.SourceTable): Promise<string> {
181-
const db = this.client.db(table.schema);
182-
const count = await db.collection(table.table).estimatedDocumentCount();
188+
const count = await this.estimatedCountNumber(table);
183189
return `~${count}`;
184190
}
185191

192+
async estimatedCountNumber(table: storage.SourceTable): Promise<number> {
193+
const db = this.client.db(table.schema);
194+
return await db.collection(table.table).estimatedDocumentCount();
195+
}
186196
/**
187197
* Start initial replication.
188198
*
@@ -228,7 +238,17 @@ export class ChangeStream {
228238
allSourceTables.push(...tables);
229239
}
230240

241+
let tablesWithStatus: SourceTable[] = [];
231242
for (let table of allSourceTables) {
243+
let count = await this.estimatedCountNumber(table);
244+
const updated = await batch.updateTableProgress(table, {
245+
totalEstimatedCount: count
246+
});
247+
tablesWithStatus.push(updated);
248+
this.relationCache.update(updated);
249+
}
250+
251+
for (let table of tablesWithStatus) {
232252
await this.snapshotTable(batch, table);
233253
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);
234254

@@ -358,8 +378,7 @@ export class ChangeStream {
358378
descriptor: SourceEntityDescriptor,
359379
options: { snapshot: boolean }
360380
): Promise<SourceTable> {
361-
const cacheId = getCacheIdentifier(descriptor);
362-
const existing = this.relation_cache.get(cacheId);
381+
const existing = this.relationCache.get(descriptor);
363382
if (existing != null) {
364383
return existing;
365384
}
@@ -425,7 +444,7 @@ export class ChangeStream {
425444
entity_descriptor: descriptor,
426445
sync_rules: this.sync_rules
427446
});
428-
this.relation_cache.set(getCacheIdentifier(descriptor), result.table);
447+
this.relationCache.update(result.table);
429448

430449
// Drop conflicting collections.
431450
// This is generally not expected for MongoDB source dbs, so we log an error.
@@ -800,7 +819,7 @@ export class ChangeStream {
800819
});
801820
if (table.syncAny) {
802821
await batch.drop([table]);
803-
this.relation_cache.delete(getCacheIdentifier(rel));
822+
this.relationCache.delete(table);
804823
}
805824
} else if (changeDocument.operationType == 'rename') {
806825
const relFrom = getMongoRelation(changeDocument.ns);
@@ -811,7 +830,7 @@ export class ChangeStream {
811830
});
812831
if (tableFrom.syncAny) {
813832
await batch.drop([tableFrom]);
814-
this.relation_cache.delete(getCacheIdentifier(relFrom));
833+
this.relationCache.delete(relFrom);
815834
}
816835
// Here we do need to snapshot the new table
817836
const collection = await this.getCollectionInfo(relTo.schema, relTo.name);

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
2020
/**
2121
* For in-memory cache only.
2222
*/
23-
export function getCacheIdentifier(source: storage.SourceEntityDescriptor): string {
23+
export function getCacheIdentifier(source: storage.SourceEntityDescriptor | storage.SourceTable): string {
24+
if (source instanceof storage.SourceTable) {
25+
return `${source.schema}.${source.table}`;
26+
}
2427
return `${source.schema}.${source.name}`;
2528
}
2629

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,14 @@ export class PostgresBucketBatch
448448
});
449449
}
450450

451+
async updateTableProgress(
452+
table: storage.SourceTable,
453+
progress: Partial<storage.TableSnapshotStatus>
454+
): Promise<storage.SourceTable> {
455+
// TODO: implement
456+
return table;
457+
}
458+
451459
addCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): void {
452460
this.write_checkpoint_batch.push({
453461
...checkpoint,

modules/module-postgres/src/replication/SnapshotQuery.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
import { ColumnDescriptor, SourceTable } from '@powersync/service-core';
1+
import { ColumnDescriptor, SourceTable, bson } from '@powersync/service-core';
22
import { PgChunk, PgConnection, PgType, StatementParam, PgTypeOid } from '@powersync/service-jpgwire';
33
import { escapeIdentifier } from '@powersync/lib-service-postgres';
44
import { SqliteValue } from '@powersync/service-sync-rules';
5+
import { ServiceAssertionError } from '@powersync/lib-services-framework';
56

67
export interface SnapshotQuery {
78
initialize(): Promise<void>;
9+
/**
10+
* Returns an async iterable iterator that yields chunks of data.
11+
*
12+
* If the last chunk has 0 rows, it indicates that there are no more rows to fetch.
13+
*/
814
nextChunk(): AsyncIterableIterator<PgChunk>;
915
}
1016

@@ -74,7 +80,7 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
7480
}
7581

7682
private readonly key: ColumnDescriptor;
77-
private lastKey: string | bigint | null = null;
83+
lastKey: string | bigint | null = null;
7884

7985
public constructor(
8086
private readonly connection: PgConnection,
@@ -88,6 +94,23 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
8894
// No-op
8995
}
9096

97+
public setLastKeySerialized(key: Uint8Array) {
98+
const decoded = bson.deserialize(key, { useBigInt64: true });
99+
const keys = Object.keys(decoded);
100+
if (keys.length != 1) {
101+
throw new ServiceAssertionError(`Multiple keys found: ${keys.join(', ')}`);
102+
}
103+
if (keys[0] != this.key.name) {
104+
throw new ServiceAssertionError(`Key name mismatch: expected ${this.key.name}, got ${keys[0]}`);
105+
}
106+
const value = decoded[this.key.name];
107+
this.lastKey = value;
108+
}
109+
110+
public getLastKeySerialized(): Uint8Array {
111+
return bson.serialize({ [this.key.name]: this.lastKey });
112+
}
113+
91114
public async *nextChunk(): AsyncIterableIterator<PgChunk> {
92115
let stream: AsyncIterableIterator<PgChunk>;
93116
if (this.lastKey == null) {

0 commit comments

Comments
 (0)