Skip to content

Commit 079a2f5

Browse files
committed
Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events
1 parent 07201e8 commit 079a2f5

File tree

5 files changed

+72
-31
lines changed

5 files changed

+72
-31
lines changed

modules/module-mysql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
3535
"@powersync/service-jsonbig": "workspace:*",
36-
"@powersync/mysql-zongji": "0.0.0-dev-20250527085137",
36+
"@powersync/mysql-zongji": "0.0.0-dev-20250528105319",
3737
"async": "^3.2.4",
3838
"mysql2": "^3.11.0",
3939
"semver": "^7.5.4",

modules/module-mysql/src/replication/zongji/BinLogListener.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ export class BinLogListener {
3434

3535
zongji: ZongJi;
3636
processingQueue: async.QueueObject<BinLogEvent>;
37+
/**
38+
* The combined size in bytes of all the binlog events currently in the processing queue.
39+
*/
40+
queueMemoryUsage: number;
3741

3842
constructor(public options: BinLogListenerOptions) {
3943
this.connectionManager = options.connectionManager;
@@ -42,9 +46,18 @@ export class BinLogListener {
4246
this.currentGTID = null;
4347

4448
this.processingQueue = async.queue(this.createQueueWorker(), 1);
49+
this.queueMemoryUsage = 0;
4550
this.zongji = this.createZongjiListener();
4651
}
4752

53+
/**
54+
* The queue memory limit in bytes as defined in the connection options.
55+
* @private
56+
*/
57+
private get queueMemoryLimit(): number {
58+
return this.connectionManager.options.binlog_queue_memory_limit * 1024 * 1024;
59+
}
60+
4861
public async start(): Promise<void> {
4962
if (this.isStopped) {
5063
return;
@@ -62,6 +75,12 @@ export class BinLogListener {
6275
} satisfies StartOptions);
6376

6477
return new Promise<void>((resolve, reject) => {
78+
// Handle an edge case where the listener has already been stopped before completing startup
79+
if (this.isStopped) {
80+
logger.info('BinLog listener was stopped before startup completed.');
81+
resolve();
82+
}
83+
6584
this.zongji.on('error', (error) => {
6685
if (!this.isStopped) {
6786
logger.error('Binlog listener error:', error);
@@ -86,12 +105,6 @@ export class BinLogListener {
86105
resolve();
87106
logger.info('BinLog listener stopped. Replication ended.');
88107
});
89-
90-
// Handle the edge case where the listener has already been stopped before completing startup
91-
if (this.isStopped) {
92-
logger.info('BinLog listener was stopped before startup completed.');
93-
resolve();
94-
}
95108
});
96109
}
97110

@@ -112,11 +125,12 @@ export class BinLogListener {
112125
zongji.on('binlog', async (evt) => {
113126
logger.info(`Received Binlog event:${evt.getEventName()}`);
114127
this.processingQueue.push(evt);
128+
this.queueMemoryUsage += evt.size;
115129

116130
// When the processing queue grows past the threshold, we pause the binlog listener
117-
if (this.processingQueue.length() > this.connectionManager.options.max_binlog_queue_size) {
131+
if (this.isQueueOverCapacity()) {
118132
logger.info(
119-
`Max Binlog processing queue length [${this.connectionManager.options.max_binlog_queue_size}] reached. Pausing Binlog listener.`
133+
`Binlog processing queue has reached its memory limit of [${this.connectionManager.options.binlog_queue_memory_limit}MB]. Pausing Binlog listener.`
120134
);
121135
zongji.pause();
122136
await this.processingQueue.empty();
@@ -201,6 +215,12 @@ export class BinLogListener {
201215
await this.eventHandler.onCommit(LSN);
202216
break;
203217
}
218+
219+
this.queueMemoryUsage -= evt.size;
204220
};
205221
}
222+
223+
isQueueOverCapacity(): boolean {
224+
return this.queueMemoryUsage >= this.queueMemoryLimit;
225+
}
206226
}

modules/module-mysql/src/types/types.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export interface NormalizedMySQLConnectionConfig {
2424

2525
lookup?: LookupFunction;
2626

27-
max_binlog_queue_size: number;
27+
binlog_queue_memory_limit: number;
2828
}
2929

3030
export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.and(
@@ -43,8 +43,8 @@ export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.a
4343
client_private_key: t.string.optional(),
4444

4545
reject_ip_ranges: t.array(t.string).optional(),
46-
// The maximum number of binlog events that can be queued in memory before throttling is applied.
47-
max_binlog_queue_size: t.number.optional()
46+
// The combined size of binlog events that can be queued in memory before throttling is applied.
47+
binlog_queue_memory_limit: t.number.optional()
4848
})
4949
);
5050

@@ -118,8 +118,8 @@ export function normalizeConnectionConfig(options: MySQLConnectionConfig): Norma
118118

119119
server_id: options.server_id ?? 1,
120120

121-
// Based on profiling, a queue size of 1000 uses about 50MB of memory.
122-
max_binlog_queue_size: options.max_binlog_queue_size ?? 1000,
121+
// Binlog processing queue memory limit before throttling is applied.
122+
binlog_queue_memory_limit: options.binlog_queue_memory_limit ?? 50,
123123

124124
lookup
125125
};

modules/module-mysql/test/src/BinLogListener.test.ts

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import { v4 as uuid } from 'uuid';
66
import * as common from '@module/common/common-index.js';
77
import { createRandomServerId } from '@module/utils/mysql-utils.js';
88
import { TableMapEntry } from '@powersync/mysql-zongji';
9+
import crypto from 'crypto';
910

1011
describe('BinlogListener tests', () => {
11-
const MAX_QUEUE_SIZE = 10;
12+
const MAX_QUEUE_CAPACITY_MB = 1;
1213
const BINLOG_LISTENER_CONNECTION_OPTIONS = {
1314
...TEST_CONNECTION_OPTIONS,
14-
max_binlog_queue_size: MAX_QUEUE_SIZE
15+
binlog_queue_memory_limit: MAX_QUEUE_CAPACITY_MB
1516
};
1617

1718
let connectionManager: MySQLConnectionManager;
@@ -22,7 +23,7 @@ describe('BinlogListener tests', () => {
2223
connectionManager = new MySQLConnectionManager(BINLOG_LISTENER_CONNECTION_OPTIONS, {});
2324
const connection = await connectionManager.getConnection();
2425
await clearTestDb(connection);
25-
await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`);
26+
await connection.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description MEDIUMTEXT)`);
2627
connection.release();
2728
const fromGTID = await getFromGTID(connectionManager);
2829

@@ -52,24 +53,30 @@ describe('BinlogListener tests', () => {
5253
expect(queueStopSpy).toHaveBeenCalled();
5354
});
5455

55-
test('Pause Zongji binlog listener when processing queue reaches max size', async () => {
56+
test('Pause Zongji binlog listener when processing queue reaches maximum memory size', async () => {
5657
const pauseSpy = vi.spyOn(binLogListener.zongji, 'pause');
5758
const resumeSpy = vi.spyOn(binLogListener.zongji, 'resume');
58-
const queueSpy = vi.spyOn(binLogListener.processingQueue, 'length');
5959

60-
const ROW_COUNT = 100;
60+
// Pause the event handler to force a backlog on the processing queue
61+
eventHandler.pause();
62+
63+
const ROW_COUNT = 10;
6164
await insertRows(connectionManager, ROW_COUNT);
6265

6366
const startPromise = binLogListener.start();
6467

68+
// Wait for listener to pause due to queue reaching capacity
69+
await vi.waitFor(() => expect(pauseSpy).toHaveBeenCalled(), { timeout: 5000 });
70+
71+
expect(binLogListener.isQueueOverCapacity()).toBeTruthy();
72+
// Resume event processing
73+
eventHandler.unpause!();
74+
6575
await vi.waitFor(() => expect(eventHandler.rowsWritten).equals(ROW_COUNT), { timeout: 5000 });
6676
binLogListener.stop();
6777
await expect(startPromise).resolves.toBeUndefined();
68-
69-
// Count how many times the queue reached the max size. Consequently, we expect the listener to have paused and resumed that many times.
70-
const overThresholdCount = queueSpy.mock.results.map((r) => r.value).filter((v) => v === MAX_QUEUE_SIZE).length;
71-
expect(pauseSpy).toHaveBeenCalledTimes(overThresholdCount);
72-
expect(resumeSpy).toHaveBeenCalledTimes(overThresholdCount);
78+
// Confirm resume was called after unpausing
79+
expect(resumeSpy).toHaveBeenCalled();
7380
});
7481

7582
test('Binlog events are correctly forwarded to provided binlog events handler', async () => {
@@ -101,7 +108,9 @@ async function getFromGTID(connectionManager: MySQLConnectionManager) {
101108

102109
async function insertRows(connectionManager: MySQLConnectionManager, count: number) {
103110
for (let i = 0; i < count; i++) {
104-
await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i}')`);
111+
await connectionManager.query(
112+
`INSERT INTO test_DATA(id, description) VALUES('${uuid()}','test${i} ${crypto.randomBytes(100_000).toString('hex')}')`
113+
);
105114
}
106115
}
107116

@@ -119,7 +128,19 @@ class TestBinLogEventHandler implements BinLogEventHandler {
119128
rowsDeleted = 0;
120129
commitCount = 0;
121130

131+
unpause: ((value: void | PromiseLike<void>) => void) | undefined;
132+
private pausedPromise: Promise<void> | undefined;
133+
134+
pause() {
135+
this.pausedPromise = new Promise((resolve) => {
136+
this.unpause = resolve;
137+
});
138+
}
139+
122140
async onWrite(rows: Row[], tableMap: TableMapEntry) {
141+
if (this.pausedPromise) {
142+
await this.pausedPromise;
143+
}
123144
this.rowsWritten = this.rowsWritten + rows.length;
124145
}
125146

pnpm-lock.yaml

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)