Skip to content

Commit ba7baeb

Browse files
authored
Configurable service limits (#218)
* Configurable limits for max_concurrent_connections, max_pool_size. * Use SyncContext for configurable parameters for sync requests. * Cleanup. * Add changeset.
1 parent cf023a7 commit ba7baeb

File tree

23 files changed

+258
-64
lines changed

23 files changed

+258
-64
lines changed

.changeset/tough-buses-ring.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-core': minor
6+
'@powersync/lib-service-postgres': minor
7+
'@powersync/lib-service-mongodb': minor
8+
'@powersync/service-types': minor
9+
'@powersync/service-image': minor
10+
---
11+
12+
Make some service limits configurable.

libs/lib-mongodb/src/db/mongo.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ export const MONGO_OPERATION_TIMEOUT_MS = 30_000;
2929
*/
3030
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000;
3131

32-
export function createMongoClient(config: BaseMongoConfigDecoded) {
32+
export interface MongoConnectionOptions {
33+
maxPoolSize: number;
34+
}
35+
36+
export function createMongoClient(config: BaseMongoConfigDecoded, options?: MongoConnectionOptions) {
3337
const normalized = normalizeMongoConfig(config);
3438
return new mongo.MongoClient(normalized.uri, {
3539
auth: {
@@ -48,7 +52,7 @@ export function createMongoClient(config: BaseMongoConfigDecoded) {
4852
// Avoid too many connections:
4953
// 1. It can overwhelm the source database.
5054
// 2. Processing too many queries in parallel can cause the process to run out of memory.
51-
maxPoolSize: 8,
55+
maxPoolSize: options?.maxPoolSize ?? 8,
5256

5357
maxConnecting: 3,
5458
maxIdleTimeMS: 60_000

libs/lib-postgres/src/db/connection/DatabaseClient.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ export class DatabaseClient extends AbstractPostgresConnection<DatabaseClientLis
4242
constructor(protected options: DatabaseClientOptions) {
4343
super();
4444
this.closed = false;
45-
this.pool = pgwire.connectPgWirePool(options.config);
45+
this.pool = pgwire.connectPgWirePool(options.config, {
46+
maxSize: options.config.max_pool_size
47+
});
4648
this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, (v, index) => {
4749
// Only listen to notifications on a single (the first) connection
4850
const notificationChannels = index == 0 ? options.notificationChannels : [];

libs/lib-postgres/src/types/types.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import * as service_types from '@powersync/service-types';
44
import * as t from 'ts-codec';
55
import * as urijs from 'uri-js';
66

7-
export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig {}
7+
export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig {
8+
max_pool_size: number;
9+
}
810

911
export const POSTGRES_CONNECTION_TYPE = 'postgresql' as const;
1012

@@ -42,7 +44,9 @@ export const BasePostgresConnectionConfig = t.object({
4244
/**
4345
* Prefix for the slot name. Defaults to "powersync_"
4446
*/
45-
slot_name_prefix: t.string.optional()
47+
slot_name_prefix: t.string.optional(),
48+
49+
max_pool_size: t.number.optional()
4650
});
4751

4852
export type BasePostgresConnectionConfig = t.Encoded<typeof BasePostgresConnectionConfig>;
@@ -125,7 +129,9 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD
125129
lookup,
126130

127131
client_certificate: options.client_certificate ?? undefined,
128-
client_private_key: options.client_private_key ?? undefined
132+
client_private_key: options.client_private_key ?? undefined,
133+
134+
max_pool_size: options.max_pool_size ?? 8
129135
} satisfies NormalizedBasePostgresConnectionConfig;
130136
}
131137

modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ export class MongoStorageProvider implements storage.BucketStorageProvider {
2222
}
2323

2424
const decodedConfig = MongoStorageConfig.decode(storage as any);
25-
const client = lib_mongo.db.createMongoClient(decodedConfig);
25+
const client = lib_mongo.db.createMongoClient(decodedConfig, {
26+
maxPoolSize: resolvedConfig.storage.max_pool_size ?? 8
27+
});
2628

2729
const database = new PowerSyncMongo(client, { database: resolvedConfig.storage.database });
2830
const factory = new MongoBucketStorage(database, {

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,6 @@ export class PowerSyncMongo {
8282
}
8383
}
8484

85-
export function createPowerSyncMongo(config: MongoStorageConfig) {
86-
return new PowerSyncMongo(lib_mongo.createMongoClient(config), { database: config.database });
85+
export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {
86+
return new PowerSyncMongo(lib_mongo.createMongoClient(config, options), { database: config.database });
8787
}

modules/module-postgres-storage/src/types/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export type RequiredOperationBatchLimits = Required<OperationBatchLimits>;
5353

5454
export type NormalizedPostgresStorageConfig = pg_wire.NormalizedConnectionConfig & {
5555
batch_limits: RequiredOperationBatchLimits;
56+
max_pool_size: number;
5657
};
5758

5859
export const normalizePostgresStorageConfig = (

packages/service-core-tests/src/tests/register-sync-tests.ts

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ export const SYNC_SNAPSHOT_PATH = path.resolve(__dirname, '../__snapshots/sync.t
3131
*/
3232
export function registerSyncTests(factory: storage.TestStorageFactory) {
3333
const tracker = new sync.RequestTracker();
34+
const syncContext = new sync.SyncContext({
35+
maxBuckets: 10,
36+
maxParameterQueryResults: 10,
37+
maxDataFetchConcurrency: 2
38+
});
3439

3540
test('sync global data', async () => {
3641
await using f = await factory();
@@ -67,6 +72,7 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
6772
});
6873

6974
const stream = sync.streamResponse({
75+
syncContext,
7076
bucketStorage: bucketStorage,
7177
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
7278
params: {
@@ -128,7 +134,8 @@ bucket_definitions:
128134
});
129135

130136
const stream = sync.streamResponse({
131-
bucketStorage: bucketStorage,
137+
syncContext,
138+
bucketStorage,
132139
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
133140
params: {
134141
buckets: [],
@@ -191,7 +198,8 @@ bucket_definitions:
191198
});
192199

193200
const stream = sync.streamResponse({
194-
bucketStorage: bucketStorage,
201+
syncContext,
202+
bucketStorage,
195203
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
196204
params: {
197205
buckets: [],
@@ -276,7 +284,8 @@ bucket_definitions:
276284
});
277285

278286
const stream = sync.streamResponse({
279-
bucketStorage: bucketStorage,
287+
syncContext,
288+
bucketStorage,
280289
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
281290
params: {
282291
buckets: [],
@@ -302,7 +311,7 @@ bucket_definitions:
302311
receivedCompletions++;
303312
if (receivedCompletions == 1) {
304313
// Trigger an empty bucket update.
305-
await bucketStorage.createManagedWriteCheckpoint({user_id: '', heads: {'1': '1/0'}});
314+
await bucketStorage.createManagedWriteCheckpoint({ user_id: '', heads: { '1': '1/0' } });
306315
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
307316
await batch.commit('1/0');
308317
});
@@ -342,7 +351,8 @@ bucket_definitions:
342351
});
343352

344353
const stream = sync.streamResponse({
345-
bucketStorage: bucketStorage,
354+
syncContext,
355+
bucketStorage,
346356
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
347357
params: {
348358
buckets: [],
@@ -371,7 +381,8 @@ bucket_definitions:
371381
await bucketStorage.autoActivate();
372382

373383
const stream = sync.streamResponse({
374-
bucketStorage: bucketStorage,
384+
syncContext,
385+
bucketStorage,
375386
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
376387
params: {
377388
buckets: [],
@@ -398,7 +409,8 @@ bucket_definitions:
398409
await bucketStorage.autoActivate();
399410

400411
const stream = sync.streamResponse({
401-
bucketStorage: bucketStorage,
412+
syncContext,
413+
bucketStorage,
402414
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
403415
params: {
404416
buckets: [],
@@ -461,7 +473,8 @@ bucket_definitions:
461473
const exp = Date.now() / 1000 + 0.1;
462474

463475
const stream = sync.streamResponse({
464-
bucketStorage: bucketStorage,
476+
syncContext,
477+
bucketStorage,
465478
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
466479
params: {
467480
buckets: [],
@@ -521,7 +534,8 @@ bucket_definitions:
521534
});
522535

523536
const stream = sync.streamResponse({
524-
bucketStorage: bucketStorage,
537+
syncContext,
538+
bucketStorage,
525539
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
526540
params: {
527541
buckets: [],
@@ -644,7 +658,8 @@ bucket_definitions:
644658
});
645659

646660
const params: sync.SyncStreamParameters = {
647-
bucketStorage: bucketStorage,
661+
syncContext,
662+
bucketStorage,
648663
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
649664
params: {
650665
buckets: [],

packages/service-core/src/routes/RouterEngine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js';
1010
import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js';
1111
import { SocketRouteGenerator } from './router-socket.js';
1212
import { RouteDefinition } from './router.js';
13+
import { SyncContext } from '../sync/SyncContext.js';
1314

1415
export type RouterSetupResponse = {
1516
onShutdown: () => Promise<void>;

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
1313
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
1414
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
1515
const { service_context } = context;
16-
const { routerEngine } = service_context;
16+
const { routerEngine, syncContext } = service_context;
1717

1818
// Create our own controller that we can abort directly
1919
const controller = new AbortController();
@@ -73,6 +73,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
7373
const tracker = new sync.RequestTracker();
7474
try {
7575
for await (const data of sync.streamResponse({
76+
syncContext: syncContext,
7677
bucketStorage: bucketStorage,
7778
syncRules: syncRules,
7879
params: {

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export const syncStreamed = routeDefinition({
2020
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
2121
handler: async (payload) => {
2222
const { service_context } = payload.context;
23-
const { routerEngine, storageEngine } = service_context;
23+
const { routerEngine, storageEngine, syncContext } = service_context;
2424
const headers = payload.request.headers;
2525
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
2626
const clientId = payload.params.client_id;
@@ -56,6 +56,7 @@ export const syncStreamed = routeDefinition({
5656
sync.transformToBytesTracked(
5757
sync.ndjson(
5858
sync.streamResponse({
59+
syncContext: syncContext,
5960
bucketStorage,
6061
syncRules: syncRules,
6162
params,

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import * as util from '../util/util-index.js';
66
import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework';
77
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
88
import { BucketSyncState } from './sync.js';
9+
import { SyncContext } from './SyncContext.js';
910

1011
export interface BucketChecksumStateOptions {
12+
syncContext: SyncContext;
1113
bucketStorage: BucketChecksumStateStorage;
1214
syncRules: SqlSyncRules;
1315
syncParams: RequestParameters;
@@ -20,6 +22,7 @@ export interface BucketChecksumStateOptions {
2022
* Handles incrementally re-computing checkpoints.
2123
*/
2224
export class BucketChecksumState {
25+
private readonly context: SyncContext;
2326
private readonly bucketStorage: BucketChecksumStateStorage;
2427

2528
/**
@@ -43,8 +46,14 @@ export class BucketChecksumState {
4346
private pendingBucketDownloads = new Set<string>();
4447

4548
constructor(options: BucketChecksumStateOptions) {
49+
this.context = options.syncContext;
4650
this.bucketStorage = options.bucketStorage;
47-
this.parameterState = new BucketParameterState(options.bucketStorage, options.syncRules, options.syncParams);
51+
this.parameterState = new BucketParameterState(
52+
options.syncContext,
53+
options.bucketStorage,
54+
options.syncRules,
55+
options.syncParams
56+
);
4857
this.bucketDataPositions = new Map();
4958

5059
for (let { name, after: start } of options.initialBucketPositions ?? []) {
@@ -73,6 +82,12 @@ export class BucketChecksumState {
7382
});
7483
}
7584
this.bucketDataPositions = dataBucketsNew;
85+
if (dataBucketsNew.size > this.context.maxBuckets) {
86+
throw new ServiceError(
87+
ErrorCode.PSYNC_S2305,
88+
`Too many buckets: ${dataBucketsNew.size} (limit of ${this.context.maxBuckets})`
89+
);
90+
}
7691

7792
let checksumMap: util.ChecksumMap;
7893
if (updatedBuckets != null) {
@@ -247,13 +262,20 @@ export interface CheckpointUpdate {
247262
}
248263

249264
export class BucketParameterState {
265+
private readonly context: SyncContext;
250266
public readonly bucketStorage: BucketChecksumStateStorage;
251267
public readonly syncRules: SqlSyncRules;
252268
public readonly syncParams: RequestParameters;
253269
private readonly querier: BucketParameterQuerier;
254270
private readonly staticBuckets: Map<string, BucketDescription>;
255271

256-
constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) {
272+
constructor(
273+
context: SyncContext,
274+
bucketStorage: BucketChecksumStateStorage,
275+
syncRules: SqlSyncRules,
276+
syncParams: RequestParameters
277+
) {
278+
this.context = context;
257279
this.bucketStorage = bucketStorage;
258280
this.syncRules = syncRules;
259281
this.syncParams = syncParams;
@@ -275,9 +297,13 @@ export class BucketParameterState {
275297
return null;
276298
}
277299

278-
if (update.buckets.length > 1000) {
279-
// TODO: Limit number of buckets even before we get to this point
280-
const error = new ServiceError(ErrorCode.PSYNC_S2305, `Too many buckets: ${update.buckets.length}`);
300+
if (update.buckets.length > this.context.maxParameterQueryResults) {
301+
// TODO: Limit number of results even before we get to this point
302+
// This limit applies _before_ we get the unique set
303+
const error = new ServiceError(
304+
ErrorCode.PSYNC_S2305,
305+
`Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})`
306+
);
281307
logger.error(error.message, {
282308
checkpoint: checkpoint,
283309
user_id: this.syncParams.user_id,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Semaphore, SemaphoreInterface, withTimeout } from 'async-mutex';
2+
3+
export interface SyncContextOptions {
4+
maxBuckets: number;
5+
maxParameterQueryResults: number;
6+
maxDataFetchConcurrency: number;
7+
}
8+
9+
/**
10+
* Maximum duration to wait for the mutex to become available.
11+
*
12+
* This gives an explicit error if there are mutex issues, rather than just hanging.
13+
*/
14+
const MUTEX_ACQUIRE_TIMEOUT = 30_000;
15+
16+
/**
17+
* Represents the context in which sync happens.
18+
*
19+
* This is global to all sync requests, not per request.
20+
*/
21+
export class SyncContext {
22+
readonly maxBuckets: number;
23+
readonly maxParameterQueryResults: number;
24+
25+
readonly syncSemaphore: SemaphoreInterface;
26+
27+
constructor(options: SyncContextOptions) {
28+
this.maxBuckets = options.maxBuckets;
29+
this.maxParameterQueryResults = options.maxParameterQueryResults;
30+
this.syncSemaphore = withTimeout(
31+
new Semaphore(options.maxDataFetchConcurrency),
32+
MUTEX_ACQUIRE_TIMEOUT,
33+
new Error(`Timeout while waiting for data`)
34+
);
35+
}
36+
}

packages/service-core/src/sync/sync-index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export * from './safeRace.js';
66
export * from './sync.js';
77
export * from './util.js';
88
export * from './BucketChecksumState.js';
9+
export * from './SyncContext.js';

0 commit comments

Comments
 (0)