Skip to content

Commit f45677b

Browse files
committed
Post-merge updates for MySQL.
1 parent e1dc7cc commit f45677b

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,19 @@ AND table_type = 'BASE TABLE';`,
484484
},
485485
onCommit: async (lsn: string) => {
486486
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1);
487-
await batch.commit(lsn);
487+
const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange });
488+
if (didCommit) {
489+
this.oldestUncommittedChange = null;
490+
this.isStartingReplication = false;
491+
}
492+
},
493+
onTransactionStart: async (options) => {
494+
if (this.oldestUncommittedChange == null) {
495+
this.oldestUncommittedChange = options.timestamp;
496+
}
497+
},
498+
onRotate: async () => {
499+
this.isStartingReplication = false;
488500
}
489501
};
490502
}

modules/module-mysql/src/replication/zongji/BinLogListener.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const MAX_QUEUE_PAUSE_TIME_MS = 45_000;
1212
export type Row = Record<string, any>;
1313

1414
export interface BinLogEventHandler {
15+
onTransactionStart: (options: { timestamp: Date }) => Promise<void>;
16+
onRotate: () => Promise<void>;
1517
onWrite: (rows: Row[], tableMap: TableMapEntry) => Promise<void>;
1618
onUpdate: (rowsAfter: Row[], rowsBefore: Row[], tableMap: TableMapEntry) => Promise<void>;
1719
onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise<void>;
@@ -196,10 +198,12 @@ export class BinLogListener {
196198
offset: evt.nextPosition
197199
}
198200
});
201+
await this.eventHandler.onTransactionStart({ timestamp: new Date(evt.timestamp) });
199202
break;
200203
case zongji_utils.eventIsRotation(evt):
201204
this.binLogPosition.filename = evt.binlogName;
202205
this.binLogPosition.offset = evt.position;
206+
await this.eventHandler.onRotate();
203207
break;
204208
case zongji_utils.eventIsWriteMutation(evt):
205209
await this.eventHandler.onWrite(evt.rows, evt.tableMap[evt.tableId]);

0 commit comments

Comments
 (0)