Skip to content

Commit a369964

Browse files
committed
Replication logging improvements.
1 parent dc02713 commit a369964

File tree

4 files changed

+56
-26
lines changed

4 files changed

+56
-26
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ export class ChangeStream {
290290
});
291291
tablesWithStatus.push(updated);
292292
this.relationCache.update(updated);
293+
logger.info(
294+
`${this.logPrefix} To replicate: ${table.qualifiedName}: ${updated.snapshotStatus?.replicatedCount}/~${updated.snapshotStatus?.totalEstimatedCount}`
295+
);
293296
}
294297

295298
for (let table of tablesWithStatus) {
@@ -366,14 +369,17 @@ export class ChangeStream {
366369
}
367370

368371
private async snapshotTable(batch: storage.BucketStorageBatch, table: storage.SourceTable) {
369-
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`);
370372
const totalEstimatedCount = await this.estimatedCountNumber(table);
371373
let at = table.snapshotStatus?.replicatedCount ?? 0;
372374
const db = this.client.db(table.schema);
373375
const collection = db.collection(table.table);
374376
await using query = new ChunkedSnapshotQuery({ collection, key: table.snapshotStatus?.lastKey });
375377
if (query.lastKey != null) {
376-
logger.info(`${this.logPrefix} Resuming snapshot for ${table.qualifiedName} at ${query.lastKey}`);
378+
logger.info(
379+
`${this.logPrefix} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming at _id > ${query.lastKey}`
380+
);
381+
} else {
382+
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`);
377383
}
378384

379385
let lastBatch = performance.now();
@@ -418,7 +424,7 @@ export class ChangeStream {
418424
const duration = performance.now() - lastBatch;
419425
lastBatch = performance.now();
420426
logger.info(
421-
`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${totalEstimatedCount} in ${duration.toFixed(0)}ms`
427+
`${this.logPrefix} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} in ${duration.toFixed(0)}ms`
422428
);
423429
await touch();
424430
}

modules/module-postgres/src/replication/SnapshotQuery.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,21 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
8585
public constructor(
8686
private readonly connection: PgConnection,
8787
private readonly table: SourceTable,
88-
private readonly chunkSize: number = 10_000
88+
private readonly chunkSize: number = 10_000,
89+
lastKeySerialized: Uint8Array | null
8990
) {
9091
this.key = table.replicaIdColumns[0];
92+
93+
if (lastKeySerialized != null) {
94+
this.lastKey = this.deserializeKey(lastKeySerialized);
95+
}
9196
}
9297

9398
public async initialize(): Promise<void> {
9499
// No-op
95100
}
96101

97-
public setLastKeySerialized(key: Uint8Array) {
102+
private deserializeKey(key: Uint8Array) {
98103
const decoded = bson.deserialize(key, { useBigInt64: true });
99104
const keys = Object.keys(decoded);
100105
if (keys.length != 1) {
@@ -104,7 +109,7 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
104109
throw new ServiceAssertionError(`Key name mismatch: expected ${this.key.name}, got ${keys[0]}`);
105110
}
106111
const value = decoded[this.key.name];
107-
this.lastKey = value;
112+
return value;
108113
}
109114

110115
public getLastKeySerialized(): Uint8Array {
@@ -113,17 +118,18 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
113118

114119
public async *nextChunk(): AsyncIterableIterator<PgChunk> {
115120
let stream: AsyncIterableIterator<PgChunk>;
121+
const escapedKeyName = escapeIdentifier(this.key.name);
116122
if (this.lastKey == null) {
117123
stream = this.connection.stream(
118-
`SELECT * FROM ${this.table.escapedIdentifier} ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}`
124+
`SELECT * FROM ${this.table.escapedIdentifier} ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`
119125
);
120126
} else {
121127
if (this.key.typeId == null) {
122128
throw new Error(`typeId required for primary key ${this.key.name}`);
123129
}
124130
let type: StatementParam['type'] = Number(this.key.typeId);
125131
stream = this.connection.stream({
126-
statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapeIdentifier(this.key.name)} > $1 ORDER BY ${escapeIdentifier(this.key.name)} LIMIT ${this.chunkSize}`,
132+
statement: `SELECT * FROM ${this.table.escapedIdentifier} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`,
127133
params: [{ value: this.lastKey, type }]
128134
});
129135
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,10 @@ WHERE oid = $1::regclass`,
434434
await this.storage.startBatch(
435435
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: true },
436436
async (batch) => {
437+
let tablesWithStatus: SourceTable[] = [];
437438
for (let tablePattern of sourceTables) {
438439
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
440+
// Pre-get counts
439441
for (let table of tables) {
440442
if (table.snapshotComplete) {
441443
logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`);
@@ -444,11 +446,17 @@ WHERE oid = $1::regclass`,
444446
const count = await this.estimatedCountNumber(db, table);
445447
table = await batch.updateTableProgress(table, { totalEstimatedCount: count });
446448
this.relationCache.update(table);
447-
await this.snapshotTableInTx(batch, db, table);
448-
await touch();
449+
tablesWithStatus.push(table);
450+
451+
logger.info(`${this.slot_name} To replicate: ${table.qualifiedName} ${table.formatSnapshotProgress()}`);
449452
}
450453
}
451454

455+
for (let table of tablesWithStatus) {
456+
await this.snapshotTableInTx(batch, db, table);
457+
await touch();
458+
}
459+
452460
// Always commit the initial snapshot at zero.
453461
// This makes sure we don't skip any changes applied before starting this snapshot,
454462
// in the case of snapshot retries.
@@ -505,11 +513,9 @@ WHERE oid = $1::regclass`,
505513
table: storage.SourceTable,
506514
limited?: PrimaryKeyValue[]
507515
) {
508-
logger.info(`${this.slot_name} Replicating ${table.qualifiedName}`);
509516
let totalEstimatedCount = table.snapshotStatus?.totalEstimatedCount;
510517
let at = table.snapshotStatus?.replicatedCount ?? 0;
511518
let lastCountTime = 0;
512-
let lastLogIndex = 0;
513519
let q: SnapshotQuery;
514520
// We do streaming on two levels:
515521
// 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time.
@@ -519,15 +525,21 @@ WHERE oid = $1::regclass`,
519525
} else if (ChunkedSnapshotQuery.supports(table)) {
520526
// Single primary key - we can use the primary key for chunking
521527
const orderByKey = table.replicaIdColumns[0];
522-
logger.info(`Chunking ${table.qualifiedName} by ${orderByKey.name}`);
523-
q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkSize);
528+
q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkSize, table.snapshotStatus?.lastKey ?? null);
524529
if (table.snapshotStatus?.lastKey != null) {
525-
(q as ChunkedSnapshotQuery).setLastKeySerialized(table.snapshotStatus!.lastKey);
526-
logger.info(`Resuming from ${(q as ChunkedSnapshotQuery).lastKey}`);
530+
logger.info(
531+
`${this.slot_name} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming from ${orderByKey.name} > ${(q as ChunkedSnapshotQuery).lastKey}`
532+
);
533+
} else {
534+
logger.info(
535+
`${this.slot_name} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resumable`
536+
);
527537
}
528538
} else {
529539
// Fallback case - query the entire table
530-
logger.info(`Snapshot ${table.qualifiedName} without chunking`);
540+
logger.info(
541+
`${this.slot_name} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - not resumable`
542+
);
531543
q = new SimpleSnapshotQuery(db, table, this.snapshotChunkSize);
532544
at = 0;
533545
}
@@ -566,12 +578,6 @@ WHERE oid = $1::regclass`,
566578
if (rows.length > 0) {
567579
hasRemainingData = true;
568580
}
569-
if (rows.length > 0 && at - lastLogIndex >= 5000) {
570-
logger.info(
571-
`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${limited ? `${limited.length}L` : this.formatCount(totalEstimatedCount)}`
572-
);
573-
lastLogIndex = at;
574-
}
575581
if (this.abort_signal.aborted) {
576582
throw new Error(`Aborted initial replication of ${this.slot_name}`);
577583
}
@@ -593,9 +599,6 @@ WHERE oid = $1::regclass`,
593599

594600
await touch();
595601
}
596-
logger.info(
597-
`${this.slot_name} Replicated ${table.qualifiedName} ${at}/${limited ? `${limited.length}L` : this.formatCount(totalEstimatedCount)}`
598-
);
599602

600603
if (limited == null) {
601604
// Important: flush before marking progress
@@ -614,6 +617,10 @@ WHERE oid = $1::regclass`,
614617
totalEstimatedCount: totalEstimatedCount
615618
});
616619
this.relationCache.update(table);
620+
621+
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`);
622+
} else {
623+
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${limited.length} for resnapshot`);
617624
}
618625
}
619626
await batch.flush();

packages/service-core/src/storage/SourceTable.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,15 @@ export class SourceTable {
100100
copy.snapshotStatus = this.snapshotStatus;
101101
return copy;
102102
}
103+
104+
formatSnapshotProgress() {
105+
if (this.snapshotComplete || this.snapshotStatus == null) {
106+
// Should not happen
107+
return '-';
108+
} else if (this.snapshotStatus.totalEstimatedCount < 0) {
109+
return `${this.snapshotStatus.replicatedCount}/unknown`;
110+
} else {
111+
return `${this.snapshotStatus.replicatedCount}/~${this.snapshotStatus.totalEstimatedCount}`;
112+
}
113+
}
103114
}

0 commit comments

Comments
 (0)