Skip to content

Commit 5b39039

Browse files
rkistnerCopilot
andauthored
Cleanup sync rules implementation (#267)
* Refactor SqlDataQuery to be immutable. * Immutable SqlParameterQuery. * Immutable StaticSqlParameterQuery. * Immutable TableValuedFunctionSqlParameterQuery. * Cleanup. * camelCase. * More camelCase. * Changeset. * Cleanup and document SqlParameterQuery fields. * Further cleanup and field documentation. * Update packages/sync-rules/src/BaseSqlDataQuery.ts Remove redundant field initializer Co-authored-by: Copilot <[email protected]> * Remove ruleId; rename id -> queryId. --------- Co-authored-by: Copilot <[email protected]>
1 parent f6cb3a1 commit 5b39039

27 files changed

+993
-676
lines changed

.changeset/little-beans-carry.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-sync-rules': minor
3+
'@powersync/service-module-postgres-storage': patch
4+
'@powersync/service-module-mongodb-storage': patch
5+
'@powersync/service-core-tests': patch
6+
'@powersync/service-core': patch
7+
---
8+
9+
Cleanup on internal sync rules implementation and APIs.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ export class MongoBucketBatch
895895
* Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable}
896896
*/
897897
protected getTableEvents(table: storage.SourceTable): SqlEventDescriptor[] {
898-
return this.sync_rules.event_descriptors.filter((evt) =>
898+
return this.sync_rules.eventDescriptors.filter((evt) =>
899899
[...evt.getSourceTables()].some((sourceTable) => sourceTable.matches(table))
900900
);
901901
}

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ export class PersistedBatch {
206206
k: sourceKey
207207
},
208208
lookup: binLookup,
209-
bucket_parameters: result.bucket_parameters
209+
bucket_parameters: result.bucketParameters
210210
}
211211
}
212212
});

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ export class PostgresBucketBatch
861861
* TODO maybe share this with an abstract class
862862
*/
863863
protected getTableEvents(table: storage.SourceTable): sync_rules.SqlEventDescriptor[] {
864-
return this.sync_rules.event_descriptors.filter((evt) =>
864+
return this.sync_rules.eventDescriptors.filter((evt) =>
865865
[...evt.getSourceTables()].some((sourceTable) => sourceTable.matches(table))
866866
);
867867
}

modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ export class PostgresPersistedBatch {
152152
const base64 = binLookup.toString('base64');
153153
remaining_lookups.delete(base64);
154154
const hexLookup = binLookup.toString('hex');
155-
const serializedBucketParameters = JSONBig.stringify(result.bucket_parameters);
155+
const serializedBucketParameters = JSONBig.stringify(result.bucketParameters);
156156
this.parameterDataInserts.push({
157157
group_id: this.group_id,
158158
source_table: table.id,

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ bucket_definitions:
404404

405405
const parameters = new RequestParameters({ sub: 'u1' }, {});
406406

407-
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
407+
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];
408408

409409
const lookups = q1.getLookups(parameters);
410410
expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]);
@@ -474,7 +474,7 @@ bucket_definitions:
474474

475475
const parameters = new RequestParameters({ sub: 'unknown' }, {});
476476

477-
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
477+
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];
478478

479479
const lookups = q1.getLookups(parameters);
480480
expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]);
@@ -564,15 +564,15 @@ bucket_definitions:
564564
const parameters = new RequestParameters({ sub: 'u1' }, {});
565565

566566
// Test intermediate values - could be moved to sync_rules.test.ts
567-
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
567+
const q1 = sync_rules.bucketDescriptors[0].parameterQueries[0];
568568
const lookups1 = q1.getLookups(parameters);
569569
expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]);
570570

571571
const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1);
572572
parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
573573
expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]);
574574

575-
const q2 = sync_rules.bucket_descriptors[0].parameter_queries[1];
575+
const q2 = sync_rules.bucketDescriptors[0].parameterQueries[1];
576576
const lookups2 = q2.getLookups(parameters);
577577
expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]);
578578

packages/service-core/src/routes/endpoints/sync-rules.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,13 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
202202

203203
return {
204204
valid: true,
205-
bucket_definitions: rules.bucket_descriptors.map((d) => {
206-
let all_parameter_queries = [...d.parameter_queries.values()].flat();
207-
let all_data_queries = [...d.data_queries.values()].flat();
205+
bucket_definitions: rules.bucketDescriptors.map((d) => {
206+
let all_parameter_queries = [...d.parameterQueries.values()].flat();
207+
let all_data_queries = [...d.dataQueries.values()].flat();
208208
return {
209209
name: d.name,
210-
bucket_parameters: d.bucket_parameters,
211-
global_parameter_queries: d.global_parameter_queries.map((q) => {
210+
bucket_parameters: d.bucketParameters,
211+
global_parameter_queries: d.globalParameterQueries.map((q) => {
212212
return {
213213
sql: q.sql
214214
};
@@ -217,7 +217,7 @@ async function debugSyncRules(apiHandler: RouteAPI, sync_rules: string) {
217217
return {
218218
sql: q.sql,
219219
table: q.sourceTable,
220-
input_parameters: q.input_parameters
220+
input_parameters: q.inputParameters
221221
};
222222
}),
223223

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ export class BucketChecksumState {
9292
*/
9393
async buildNextCheckpointLine(next: storage.StorageCheckpointUpdate): Promise<CheckpointLine | null> {
9494
const { writeCheckpoint, base } = next;
95-
const user_id = this.parameterState.syncParams.user_id;
95+
const user_id = this.parameterState.syncParams.userId;
9696

9797
const storage = this.bucketStorage;
9898

@@ -378,7 +378,7 @@ export class BucketParameterState {
378378
);
379379
this.logger.error(error.message, {
380380
checkpoint: checkpoint,
381-
user_id: this.syncParams.user_id,
381+
user_id: this.syncParams.userId,
382382
buckets: update.buckets.length
383383
});
384384

packages/service-core/src/sync/sync.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ async function* streamResponseInner(
9393
): AsyncGenerator<util.StreamingSyncLine | string | null> {
9494
const { raw_data, binary_data } = params;
9595

96-
const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id);
96+
const checkpointUserId = util.checkpointUserId(syncParams.tokenParameters.user_id as string, params.client_id);
9797

9898
const checksumState = new BucketChecksumState({
9999
syncContext,
@@ -228,7 +228,7 @@ async function* streamResponseInner(
228228
onRowsSent: markOperationsSent,
229229
abort_connection: signal,
230230
abort_batch: abortCheckpointSignal,
231-
user_id: syncParams.user_id,
231+
user_id: syncParams.userId,
232232
// Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
233233
// sync complete message instead.
234234
forPriority: !isLast ? priority : null,

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,88 @@ export interface RowValueExtractor {
1212
getTypes(schema: QuerySchema, into: Record<string, ColumnDefinition>): void;
1313
}
1414

15-
export class BaseSqlDataQuery {
16-
sourceTable?: TablePattern;
17-
table?: string;
18-
sql?: string;
19-
columns?: SelectedColumn[];
20-
extractors: RowValueExtractor[] = [];
21-
descriptor_name?: string;
22-
bucket_parameters?: string[];
23-
tools?: SqlTools;
24-
25-
ruleId?: string;
26-
27-
errors: SqlRuleError[] = [];
15+
export interface BaseSqlDataQueryOptions {
16+
sourceTable: TablePattern;
17+
table: string;
18+
sql: string;
19+
columns: SelectedColumn[];
20+
extractors: RowValueExtractor[];
21+
descriptorName: string;
22+
bucketParameters: string[];
23+
tools: SqlTools;
24+
25+
errors?: SqlRuleError[];
26+
}
2827

29-
constructor() {}
28+
export class BaseSqlDataQuery {
29+
/**
30+
* Source table or table pattern.
31+
*/
32+
readonly sourceTable: TablePattern;
33+
34+
/**
35+
* The table name or alias used in the query.
36+
*
37+
* This is used for the output table name.
38+
*/
39+
readonly table: string;
40+
41+
/**
42+
* The source SQL query, for debugging purposes.
43+
*/
44+
readonly sql: string;
45+
46+
/**
47+
* Query columns, for debugging purposes.
48+
*/
49+
readonly columns: SelectedColumn[];
50+
51+
/**
52+
* Extracts input row into output row. This is the column list in the SELECT part of the query.
53+
*
54+
* This may include plain column names, wildcards, and basic expressions.
55+
*/
56+
readonly extractors: RowValueExtractor[];
57+
58+
/**
59+
* Bucket definition name.
60+
*/
61+
readonly descriptorName: string;
62+
/**
63+
* Bucket parameter names, without the `bucket.` prefix.
64+
*
65+
* These are received from the associated parameter query (if any), and must match the filters
66+
* used in the data query.
67+
*/
68+
readonly bucketParameters: string[];
69+
/**
70+
* Used to generate debugging info.
71+
*/
72+
private readonly tools: SqlTools;
73+
74+
readonly errors: SqlRuleError[];
75+
76+
constructor(options: BaseSqlDataQueryOptions) {
77+
this.sourceTable = options.sourceTable;
78+
this.table = options.table;
79+
this.sql = options.sql;
80+
this.columns = options.columns;
81+
this.extractors = options.extractors;
82+
this.descriptorName = options.descriptorName;
83+
this.bucketParameters = options.bucketParameters;
84+
this.tools = options.tools;
85+
this.errors = options.errors ?? [];
86+
}
3087

3188
applies(table: SourceTableInterface) {
32-
return this.sourceTable?.matches(table);
89+
return this.sourceTable.matches(table);
3390
}
3491

3592
addSpecialParameters(table: SourceTableInterface, row: SqliteRow) {
36-
if (this.sourceTable!.isWildcard) {
93+
if (this.sourceTable.isWildcard) {
3794
return {
3895
...row,
39-
_table_suffix: this.sourceTable!.suffix(table.table)
96+
_table_suffix: this.sourceTable.suffix(table.table)
4097
};
4198
} else {
4299
return row;
@@ -48,17 +105,17 @@ export class BaseSqlDataQuery {
48105
// Wildcard without alias - use source
49106
return sourceTable;
50107
} else {
51-
return this.table!;
108+
return this.table;
52109
}
53110
}
54111

55112
isUnaliasedWildcard() {
56-
return this.sourceTable!.isWildcard && this.table == this.sourceTable!.tablePattern;
113+
return this.sourceTable.isWildcard && this.table == this.sourceTable.tablePattern;
57114
}
58115

59116
columnOutputNames(): string[] {
60-
return this.columns!.map((c) => {
61-
return this.tools!.getOutputName(c);
117+
return this.columns.map((c) => {
118+
return this.tools.getOutputName(c);
62119
});
63120
}
64121

@@ -67,7 +124,7 @@ export class BaseSqlDataQuery {
67124

68125
if (this.isUnaliasedWildcard()) {
69126
// Separate results
70-
for (let schemaTable of schema.getTables(this.sourceTable!)) {
127+
for (let schemaTable of schema.getTables(this.sourceTable)) {
71128
let output: Record<string, ColumnDefinition> = {};
72129

73130
this.getColumnOutputsFor(schemaTable, output);
@@ -80,11 +137,11 @@ export class BaseSqlDataQuery {
80137
} else {
81138
// Merged results
82139
let output: Record<string, ColumnDefinition> = {};
83-
for (let schemaTable of schema.getTables(this.sourceTable!)) {
140+
for (let schemaTable of schema.getTables(this.sourceTable)) {
84141
this.getColumnOutputsFor(schemaTable, output);
85142
}
86143
result.push({
87-
name: this.table!,
144+
name: this.table,
88145
columns: Object.values(output)
89146
});
90147
}
@@ -103,15 +160,15 @@ export class BaseSqlDataQuery {
103160
protected getColumnOutputsFor(schemaTable: SourceSchemaTable, output: Record<string, ColumnDefinition>) {
104161
const querySchema: QuerySchema = {
105162
getColumn: (table, column) => {
106-
if (table == this.table!) {
163+
if (table == this.table) {
107164
return schemaTable.getColumn(column);
108165
} else {
109166
// TODO: bucket parameters?
110167
return undefined;
111168
}
112169
},
113170
getColumns: (table) => {
114-
if (table == this.table!) {
171+
if (table == this.table) {
115172
return schemaTable.getColumns();
116173
} else {
117174
return [];

packages/sync-rules/src/BucketDescription.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*/
1414
export type BucketPriority = 0 | 1 | 2 | 3;
1515

16-
export const defaultBucketPriority: BucketPriority = 3;
16+
export const DEFAULT_BUCKET_PRIORITY: BucketPriority = 3;
1717

1818
export const isValidPriority = (i: number): i is BucketPriority => {
1919
return Number.isInteger(i) && i >= 0 && i <= 3;

0 commit comments

Comments
 (0)