Skip to content

Commit 5c5b70d

Browse files
committed
refactor: hoist createObservableDataSource to projection-typeorm
restructure observable DataSource utils for easier re-use as preparation for new StabilityWindowBuffer interface that will require it's own connection during Bootstrap
1 parent 24b3181 commit 5c5b70d

21 files changed

+248
-227
lines changed

packages/cardano-services/src/Program/services/pgboss.ts

+25-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
PoolRegistrationEntity,
77
PoolRetirementEntity,
88
StakePoolEntity,
9+
createDataSource,
910
createPgBoss,
1011
isRecoverableTypeormError
1112
} from '@cardano-sdk/projection-typeorm';
@@ -32,14 +33,13 @@ import { Pool } from 'pg';
3233
import { Router } from 'express';
3334
import { StakePoolMetadataProgramOptions } from '../options/stakePoolMetadata';
3435
import { contextLogger } from '@cardano-sdk/util';
35-
import { createObservableDataSource } from '../../Projection/createTypeormProjection';
3636
import { retryBackoff } from 'backoff-rxjs';
3737
import PgBoss from 'pg-boss';
3838

3939
/**
4040
* The entities required by the job handlers
4141
*/
42-
export const pgBossEntities = [
42+
export const pgBossEntities: Function[] = [
4343
CurrentPoolMetricsEntity,
4444
BlockEntity,
4545
PoolMetadataEntity,
@@ -49,13 +49,28 @@ export const pgBossEntities = [
4949
];
5050

5151
export const createPgBossDataSource = (connectionConfig$: Observable<PgConnectionConfig>, logger: Logger) =>
52-
createObservableDataSource({
53-
connectionConfig$,
54-
entities: pgBossEntities,
55-
extensions: {},
56-
logger,
57-
migrationsRun: false
58-
});
52+
// TODO: use createObservableDataSource from projection-typeorm package.
53+
// A challenge in doing that is that we call subscriber.error on retryable errors in order to reconnect.
54+
// Doing that with createObservableDataSource will 'destroy' the data source that's currently used,
55+
// so pg-boss is then unable to update job status and it stays 'active', not available for the newly
56+
// recreated worker to be picked up.
57+
// TODO: this raises another question - what happens when database connection drops while working on a job?
58+
// Will it stay 'active' forever, or will pg-boss eventually update it due to some sort of timeout?
59+
connectionConfig$.pipe(
60+
switchMap((connectionConfig) =>
61+
from(
62+
(async () => {
63+
const dataSource = createDataSource({
64+
connectionConfig,
65+
entities: pgBossEntities,
66+
logger
67+
});
68+
await dataSource.initialize();
69+
return dataSource;
70+
})()
71+
)
72+
)
73+
);
5974

6075
export type PgBossWorkerArgs = CommonProgramOptions &
6176
StakePoolMetadataProgramOptions &
@@ -140,6 +155,7 @@ export class PgBossHttpService extends HttpService {
140155
// This ensures that if an error which can't be retried arrives here is handled as a FATAL error
141156
shouldRetry: (error: unknown) => {
142157
const retry = isRecoverableError(error);
158+
this.logger.debug('work() shouldRetry', retry, error);
143159

144160
this.#health = {
145161
ok: false,

packages/cardano-services/src/Projection/createTypeormProjection.ts

+9-52
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
/* eslint-disable prefer-spread */
33
import { Cardano } from '@cardano-sdk/core';
44
import { Logger } from 'ts-log';
5-
import { Observable, from, switchMap, takeWhile } from 'rxjs';
5+
import { Observable, takeWhile } from 'rxjs';
66
import {
77
PgConnectionConfig,
88
TypeormDevOptions,
99
TypeormStabilityWindowBuffer,
1010
WithTypeormContext,
11-
createDataSource,
11+
createObservableConnection,
1212
isRecoverableTypeormError,
1313
typeormTransactionCommit,
1414
withTypeormTransaction
@@ -44,47 +44,6 @@ const applyStores =
4444
(evt$: Observable<T>) =>
4545
evt$.pipe.apply(evt$, selectedStores as any) as Observable<T>;
4646

47-
export const createObservableDataSource = ({
48-
connectionConfig$,
49-
logger,
50-
buffer,
51-
devOptions,
52-
entities,
53-
extensions,
54-
migrationsRun
55-
}: Omit<
56-
CreateTypeormProjectionProps,
57-
'blocksBufferLength' | 'exitAtBlockNo' | 'projections' | 'projectionSource$' | 'projectionOptions'
58-
> &
59-
Pick<PreparedProjection, 'entities' | 'extensions'> & { migrationsRun: boolean }) =>
60-
connectionConfig$.pipe(
61-
switchMap((connectionConfig) =>
62-
from(
63-
(async () => {
64-
const dataSource = createDataSource({
65-
connectionConfig,
66-
devOptions,
67-
entities,
68-
extensions,
69-
logger,
70-
options: {
71-
installExtensions: true,
72-
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
73-
migrationsRun: migrationsRun && !devOptions?.synchronize
74-
}
75-
});
76-
await dataSource.initialize();
77-
if (buffer) {
78-
const queryRunner = dataSource.createQueryRunner('master');
79-
await buffer.initialize(queryRunner);
80-
await queryRunner.release();
81-
}
82-
return dataSource;
83-
})()
84-
)
85-
)
86-
);
87-
8847
/**
8948
* Creates a projection observable that applies a sequence of operators
9049
* required to project requested `projections` into a postgres database.
@@ -116,24 +75,22 @@ export const createTypeormProjection = ({
11675
},
11776
{ logger }
11877
);
119-
const dataSource$ = createObservableDataSource({
120-
buffer,
78+
const connection$ = createObservableConnection({
12179
connectionConfig$,
12280
devOptions,
12381
entities,
12482
extensions,
12583
logger,
126-
migrationsRun: true
84+
options: {
85+
installExtensions: true,
86+
migrations: migrations.filter(({ entity }) => entities.includes(entity as any)),
87+
migrationsRun: !devOptions?.synchronize
88+
}
12789
});
12890
return projectionSource$.pipe(
12991
applyMappers(mappers),
13092
shareRetryBackoff(
131-
(evt$) =>
132-
evt$.pipe(
133-
withTypeormTransaction({ dataSource$, logger }, extensions),
134-
applyStores(stores),
135-
typeormTransactionCommit()
136-
),
93+
(evt$) => evt$.pipe(withTypeormTransaction({ connection$ }), applyStores(stores), typeormTransactionCommit()),
13794
{ shouldRetry: isRecoverableTypeormError }
13895
),
13996
requestNext(),

packages/cardano-services/test/Program/services/pgboss.test.ts

+20-13
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
BlockEntity,
33
PgConnectionConfig,
44
STAKE_POOL_METADATA_QUEUE,
5+
createDataSource,
56
createPgBossExtension
67
} from '@cardano-sdk/projection-typeorm';
78
import { Cardano } from '@cardano-sdk/core';
@@ -11,7 +12,7 @@ import { PgBossHttpService, pgBossEntities } from '../../../src/Program/services
1112
import { Pool } from 'pg';
1213
import { StakePoolMetadataFetchMode } from '../../../src/Program/options';
1314
import { WorkerHandlerFactoryOptions } from '../../../src/PgBoss';
14-
import { createObservableDataSource, getConnectionConfig, getPool } from '../../../src';
15+
import { getConnectionConfig, getPool } from '../../../src/Program/services/postgres';
1516
import { logger } from '@cardano-sdk/util-dev';
1617

1718
const dnsResolver = () => Promise.resolve({ name: 'localhost', port: 5433, priority: 6, weight: 5 });
@@ -70,6 +71,7 @@ jest.mock('@cardano-sdk/projection-typeorm', () => {
7071

7172
describe('PgBossHttpService', () => {
7273
let connectionConfig$: Observable<PgConnectionConfig>;
74+
let connectionConfig: PgConnectionConfig;
7375
let dataSource: DataSource;
7476
let db: Pool;
7577
let service: PgBossHttpService | undefined;
@@ -87,15 +89,7 @@ describe('PgBossHttpService', () => {
8789
};
8890

8991
connectionConfig$ = getConnectionConfig(dnsResolver, 'test', 'StakePool', args);
90-
const dataSource$ = createObservableDataSource({
91-
connectionConfig$,
92-
devOptions: { dropSchema: true, synchronize: true },
93-
entities: pgBossEntities,
94-
extensions: { pgBoss: true },
95-
logger,
96-
migrationsRun: false
97-
});
98-
dataSource = await firstValueFrom(dataSource$);
92+
connectionConfig = await firstValueFrom(connectionConfig$);
9993

10094
const pool = await getPool(dnsResolver, logger, args);
10195

@@ -104,8 +98,20 @@ describe('PgBossHttpService', () => {
10498
db = pool;
10599
});
106100

101+
beforeEach(async () => {
102+
dataSource = createDataSource({
103+
connectionConfig,
104+
devOptions: { dropSchema: true, synchronize: true },
105+
entities: pgBossEntities,
106+
extensions: { pgBoss: true },
107+
logger
108+
});
109+
await dataSource.initialize();
110+
});
111+
107112
afterEach(async () => {
108113
await service?.shutdown();
114+
await dataSource.destroy().catch(() => void 0);
109115
});
110116

111117
it('health check is ok after start with a valid db connection', async () => {
@@ -116,7 +122,6 @@ describe('PgBossHttpService', () => {
116122
parallelJobs: 3,
117123
queues: []
118124
},
119-
120125
{ connectionConfig$, db, logger }
121126
);
122127
expect(await service.healthCheck()).toEqual({ ok: false, reason: 'PgBossHttpService not started' });
@@ -132,7 +137,6 @@ describe('PgBossHttpService', () => {
132137
let observableResolver = () => {};
133138
let subscriptions = 0;
134139

135-
const connectionConfig = await firstValueFrom(connectionConfig$);
136140
const config$ = new Observable<PgConnectionConfig>((subscriber) => {
137141
subscriptions++;
138142

@@ -155,6 +159,8 @@ describe('PgBossHttpService', () => {
155159
await service.start();
156160

157161
// Insert test block with slot 1
162+
const queryRunner = dataSource.createQueryRunner();
163+
await queryRunner.connect();
158164
const blockRepos = dataSource.getRepository(BlockEntity);
159165
const block = { hash: 'test', height: 1, slot: 1 };
160166
await blockRepos.insert(block);
@@ -165,7 +171,6 @@ describe('PgBossHttpService', () => {
165171
expect(await collectStatus()).toEqual({ calls: 0, health: { ok: true }, subscriptions: 1 });
166172

167173
// Schedule a job
168-
const queryRunner = dataSource.createQueryRunner();
169174
const pgboss = createPgBossExtension(queryRunner, logger);
170175
await pgboss.send(STAKE_POOL_METADATA_QUEUE, {}, { retryDelay: 1, retryLimit: 100, slot: Cardano.Slot(1) });
171176
await queryRunner.release();
@@ -225,5 +230,7 @@ describe('PgBossHttpService', () => {
225230
await testPromises[3];
226231

227232
expect(await collectStatus()).toEqual({ calls: 4, health: { ok: true }, subscriptions: 3 });
233+
234+
await dataSource.destroy();
228235
});
229236
});

packages/cardano-services/test/Projection/createTypeormProjection.test.ts

+14-7
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,21 @@ import { projectorConnectionConfig, projectorConnectionConfig$ } from '../util';
1414
describe('createTypeormProjection', () => {
1515
it('creates a projection to PostgreSQL based on requested projection names', async () => {
1616
// Setup
17-
const data = chainSyncData(ChainSyncDataSet.WithMint);
18-
const buffer = new TypeormStabilityWindowBuffer({ allowNonSequentialBlockHeights: true, logger });
1917
const projections = [ProjectionName.UTXO];
18+
const buffer = new TypeormStabilityWindowBuffer({ allowNonSequentialBlockHeights: true, logger });
19+
const { entities } = prepareTypeormProjection({ buffer, projections }, { logger });
20+
const dataSource = createDataSource({
21+
connectionConfig: projectorConnectionConfig,
22+
devOptions: { dropSchema: true, synchronize: true },
23+
entities,
24+
logger
25+
});
26+
await dataSource.initialize();
27+
const queryRunner = dataSource.createQueryRunner();
28+
await queryRunner.connect();
29+
const data = chainSyncData(ChainSyncDataSet.WithMint);
30+
await buffer.initialize(queryRunner);
31+
2032
const projection$ = createTypeormProjection({
2133
blocksBufferLength: 10,
2234
buffer,
@@ -36,11 +48,6 @@ describe('createTypeormProjection', () => {
3648
await lastValueFrom(projection$);
3749

3850
// Check data in the database
39-
const { entities } = prepareTypeormProjection({ buffer, projections }, { logger });
40-
const dataSource = createDataSource({ connectionConfig: projectorConnectionConfig, entities, logger });
41-
await dataSource.initialize();
42-
const queryRunner = dataSource.createQueryRunner();
43-
await queryRunner.connect();
4451
expect(await queryRunner.manager.count(AssetEntity)).toBeGreaterThan(0);
4552
expect(await queryRunner.manager.count(TokensEntity)).toBeGreaterThan(0);
4653
expect(await queryRunner.manager.count(OutputEntity)).toBeGreaterThan(0);

packages/e2e/test/projection/offline-fork.test.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import {
2121
} from '@cardano-sdk/core';
2222
import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev';
2323
import { ConnectionConfig } from '@cardano-ogmios/client';
24-
import { Observable, filter, firstValueFrom, lastValueFrom, of, take, takeWhile, toArray } from 'rxjs';
24+
import { Observable, defer, filter, firstValueFrom, lastValueFrom, of, take, takeWhile, toArray } from 'rxjs';
2525
import { OgmiosObservableCardanoNode } from '@cardano-sdk/ogmios';
26+
import { QueryRunner } from 'typeorm';
2627
import { createDatabase } from 'typeorm-extension';
2728
import { getEnv } from '../../src';
2829

@@ -202,6 +203,7 @@ describe('resuming projection when intersection is not local tip', () => {
202203
installExtensions: true
203204
}
204205
});
206+
let queryRunner: QueryRunner;
205207

206208
const getNumberOfLocalStakeKeys = async () => {
207209
const repository = dataSource.getRepository(Postgres.StakeKeyEntity);
@@ -217,15 +219,16 @@ describe('resuming projection when intersection is not local tip', () => {
217219
}
218220
});
219221
await dataSource.initialize();
220-
await buffer.initialize(dataSource.createQueryRunner());
222+
queryRunner = dataSource.createQueryRunner();
223+
await buffer.initialize(queryRunner);
221224
});
222225
afterAll(() => dataSource.destroy());
223226

224227
testRollbackAndContinue(
225228
buffer,
226229
(evt$) =>
227230
evt$.pipe(
228-
Postgres.withTypeormTransaction({ dataSource$: of(dataSource), logger }),
231+
Postgres.withTypeormTransaction({ connection$: defer(() => of({ queryRunner })) }),
229232
Postgres.storeBlock(),
230233
Postgres.storeStakeKeys(),
231234
buffer.storeBlockData(),

packages/e2e/test/projection/single-tenant-utxo.test.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Bootstrap, Mappers, ProjectionEvent, requestNext } from '@cardano-sdk/p
44
import { Cardano, ObservableCardanoNode } from '@cardano-sdk/core';
55
import { ConnectionConfig } from '@cardano-ogmios/client';
66
import { DataSource, QueryRunner } from 'typeorm';
7-
import { Observable, filter, firstValueFrom, lastValueFrom, of, scan, takeWhile } from 'rxjs';
7+
import { Observable, defer, filter, firstValueFrom, lastValueFrom, of, scan, takeWhile } from 'rxjs';
88
import { OgmiosObservableCardanoNode } from '@cardano-sdk/ogmios';
99
import { createDatabase, dropDatabase } from 'typeorm-extension';
1010
import { getEnv } from '../../src';
@@ -104,7 +104,7 @@ describe('single-tenant utxo projection', () => {
104104
Bootstrap.fromCardanoNode({ blocksBufferLength: 10, buffer, cardanoNode, logger }).pipe(
105105
Mappers.withMint(),
106106
Mappers.withUtxo(),
107-
Postgres.withTypeormTransaction({ dataSource$: of(dataSource), logger }),
107+
Postgres.withTypeormTransaction({ connection$: defer(() => of({ queryRunner })) }),
108108
Postgres.storeBlock(),
109109
Postgres.storeAssets(),
110110
Postgres.storeUtxo(),
@@ -115,7 +115,7 @@ describe('single-tenant utxo projection', () => {
115115

116116
const storeUtxo = (evt$: Observable<ProjectionEvent<Mappers.WithMint & Mappers.WithUtxo>>) =>
117117
evt$.pipe(
118-
Postgres.withTypeormTransaction({ dataSource$: of(dataSource), logger }),
118+
Postgres.withTypeormTransaction({ connection$: defer(() => of({ queryRunner })) }),
119119
Postgres.storeBlock(),
120120
Postgres.storeAssets(),
121121
Postgres.storeUtxo(),

packages/golden-test-generator/src/ChainSyncEvents/chainSyncEvents.ts

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export const getChainSyncEvents = async (
9696
const header = ogmiosToCore.blockHeader(block);
9797
if (!header) return;
9898
currentBlock = header.blockNo;
99+
99100
if (onBlock !== undefined) {
100101
onBlock(currentBlock);
101102
}

0 commit comments

Comments
 (0)