Skip to content

Commit 286ba16

Browse files
committed
Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events.
Introduced a maximum timeout that the binlog processing queue can be paused before auto-resuming. This is to prevent the replication connection timing out.
1 parent 59afb33 commit 286ba16

File tree

3 files changed

+17
-7
lines changed

3 files changed

+17
-7
lines changed

modules/module-mysql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
3535
"@powersync/service-jsonbig": "workspace:*",
36-
"@powersync/mysql-zongji": "0.0.0-dev-20250528105319",
36+
"@powersync/mysql-zongji": "0.2.0",
3737
"async": "^3.2.4",
3838
"mysql2": "^3.11.0",
3939
"semver": "^7.5.4",

modules/module-mysql/src/replication/zongji/BinLogListener.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import * as zongji_utils from './zongji-utils.js';
55
import { logger } from '@powersync/lib-services-framework';
66
import { MySQLConnectionManager } from '../MySQLConnectionManager.js';
77

8+
// Maximum time the processing queue can be paused before resuming automatically
9+
// MySQL server will automatically terminate replication connections after 60 seconds of inactivity, so this guards against that.
10+
const MAX_QUEUE_PAUSE_TIME_MS = 45_000;
11+
812
export type Row = Record<string, any>;
913

1014
export interface BinLogEventHandler {
@@ -133,7 +137,12 @@ export class BinLogListener {
133137
`Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.`
134138
);
135139
zongji.pause();
136-
await this.processingQueue.empty();
140+
const resumeTimeoutPromise = new Promise((resolve) => {
141+
setTimeout(() => resolve('timeout'), MAX_QUEUE_PAUSE_TIME_MS);
142+
});
143+
144+
await Promise.race([this.processingQueue.empty(), resumeTimeoutPromise]);
145+
137146
logger.info(`Binlog processing queue backlog cleared. Resuming Binlog listener.`);
138147
zongji.resume();
139148
}
@@ -162,6 +171,7 @@ export class BinLogListener {
162171
// The timeout here must be greater than the master_heartbeat_period.
163172
const socket = this.zongji.connection._socket!;
164173
socket.setTimeout(60_000, () => {
174+
logger.info('Destroying socket due to replication connection timeout.');
165175
socket.destroy(new Error('Replication connection timeout.'));
166176
});
167177
logger.info(

pnpm-lock.yaml

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)