Skip to content

Commit 308b003

Browse files
authored
Merge pull request #1410 from input-output-hk/fix/no-migrations-on-worker-and-provider-startup
fix: no migrations on worker and provider startup
2 parents 0aadecb + d5faa15 commit 308b003

File tree

16 files changed

+338
-188
lines changed

16 files changed

+338
-188
lines changed

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --con
6161

6262
FROM cardano-services as worker
6363
WORKDIR /app/packages/cardano-services
64-
CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --conditions=development src/cli start-worker"]
64+
CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --conditions=development src/cli start-pg-boss-worker"]
6565

6666
FROM cardano-services as blockfrost-worker
6767
ENV \

packages/cardano-services/.env.test

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ POSTGRES_CONNECTION_STRING_HANDLE=postgresql://postgres:[email protected]
33
POSTGRES_CONNECTION_STRING_PROJECTION=postgresql://postgres:[email protected]:5433/projection
44
POSTGRES_CONNECTION_STRING_STAKE_POOL=postgresql://postgres:[email protected]:5433/stake_pool
55
POSTGRES_CONNECTION_STRING_ASSET=postgresql://postgres:[email protected]:5433/asset
6+
POSTGRES_CONNECTION_STRING_EMPTY=postgresql://postgres:[email protected]:5433/empty
67
CARDANO_NODE_CONFIG_PATH=./config/network/mainnet/cardano-node/config.json
78
DB_CACHE_TTL=120
89
EPOCH_POLL_INTERVAL=10000

packages/cardano-services/README.md

+4-7
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,16 @@ OGMIOS_SRV_SERVICE_NAME=some-domain-for-ogmios \
9999
./dist/cjs/cli.js start-provider-server
100100
```
101101

102-
**`start-worker` using CLI options:**
102+
**`start-pg-boss-worker` using CLI options:**
103103

104104
```bash
105-
./dist/cjs/cli.js \
106-
start-worker \
107-
--ogmios-srv-service-name some-domain-for-ogmios
105+
./dist/cjs/cli.js start-pg-boss-worker --queues=pool-metadata --postgres-connection-string-stake-pool "postgresql://postgres:doNoUseThisSecret\!@localhost:5432/projection" --postgres-connection-string-db-sync "postgresql://postgres:doNoUseThisSecret\!@localhost:5432/cexplorer"
108106
```
109107

110-
**`start-worker` using env variables:**
108+
**`start-pg-boss-worker` using env variables:**
111109

112110
```bash
113-
OGMIOS_SRV_SERVICE_NAME=some-domain-for-ogmios \
114-
./dist/cjs/cli.js start-worker
111+
QUEUES=pool-metadata POSTGRES_CONNECTION_STRING_STAKE_POOL=postgresql://postgres:doNoUseThisSecret\!@localhost:5432/projection POSTGRES_CONNECTION_STRING_DB_SYNC=postgresql://postgres:doNoUseThisSecret\!@localhost:5432/cexplorer ./dist/cjs/cli.js start-pg-boss-worker
115112
```
116113

117114
**`start-projector` using CLI options with Ogmios and PostgreSQL running on localhost:**

packages/cardano-services/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
"npm-run-all": "^4.1.5",
8787
"ts-jest": "^28.0.7",
8888
"ts-node": "^10.0.0",
89+
"typeorm-extension": "^2.7.0",
8990
"typescript": "^4.7.4",
9091
"wait-on": "^6.0.1"
9192
},

packages/cardano-services/src/Asset/TypeOrmNftMetadataService.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { Asset, Cardano } from '@cardano-sdk/core';
1+
import { Asset, Cardano, Milliseconds } from '@cardano-sdk/core';
22
import { AssetPolicyIdAndName, NftMetadataService } from './types';
33
import { NftMetadataEntity } from '@cardano-sdk/projection-typeorm';
44
import { QueryRunner } from 'typeorm';
55
import { TypeormProviderDependencies, TypeormService } from '../util';
66

77
export class TypeOrmNftMetadataService extends TypeormService implements NftMetadataService {
88
constructor({ connectionConfig$, logger, entities }: TypeormProviderDependencies) {
9-
super('TypeOrmNftMetadataService', { connectionConfig$, entities, logger });
9+
super('TypeOrmNftMetadataService', { connectionConfig$, connectionTimeout: Milliseconds(1000), entities, logger });
1010
}
1111

1212
async getNftMetadata(assetInfo: AssetPolicyIdAndName): Promise<Asset.NftMetadata | null> {

packages/cardano-services/src/Program/programs/pgBossWorker.ts

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ export class PgBossWorkerHttpServer extends HttpServer {
3030
const { apiUrl } = cfg;
3131
const { logger } = deps;
3232
const pgBossService = new PgBossHttpService(cfg, deps);
33+
pgBossService.onUnrecoverableError$.subscribe(() => {
34+
// eslint-disable-next-line unicorn/no-process-exit
35+
process.exit(1);
36+
});
3337

3438
super(
3539
{ listen: getListen(apiUrl), name: pgBossWorker },

packages/cardano-services/src/Program/services/pgboss.ts

+23-9
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import { HttpService } from '../../Http/HttpService';
1919
import { Logger } from 'ts-log';
2020
import {
2121
Observable,
22+
Subject,
2223
Subscription,
2324
catchError,
2425
concat,
2526
finalize,
26-
firstValueFrom,
2727
from,
2828
merge,
2929
share,
@@ -66,9 +66,18 @@ export const createPgBossDataSource = (connectionConfig$: Observable<PgConnectio
6666
const dataSource = createDataSource({
6767
connectionConfig,
6868
entities: pgBossEntities,
69-
logger
69+
extensions: { pgBoss: true },
70+
logger,
71+
options: { migrationsRun: false }
7072
});
7173
await dataSource.initialize();
74+
const pgbossSchema = await dataSource.query(
75+
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgboss'"
76+
);
77+
if (pgbossSchema.length === 0) {
78+
await dataSource.destroy();
79+
throw new Error('Database schema is not ready. Please make sure projector is running.');
80+
}
7281
return dataSource;
7382
})()
7483
)
@@ -105,6 +114,7 @@ export class PgBossHttpService extends HttpService {
105114
#db: Pool;
106115
#subscription?: Subscription;
107116
#health: HealthCheckResponse = { ok: false, reason: 'PgBossHttpService not started' };
117+
onUnrecoverableError$ = new Subject<unknown>();
108118

109119
constructor(cfg: PgBossWorkerArgs, deps: PgBossServiceDependencies) {
110120
const { connectionConfig$, db, logger } = deps;
@@ -127,11 +137,16 @@ export class PgBossHttpService extends HttpService {
127137
// Used for later use of firstValueFrom() to avoid it subscribes again
128138
const sharedWork$ = this.work().pipe(share());
129139

130-
// Subscribe to work() to create the first DataSource and start pg-boss
131-
this.#subscription = sharedWork$.subscribe();
132-
133-
// Used to make startImpl actually await for a first emitted value from work()
134-
await firstValueFrom(sharedWork$);
140+
return new Promise<void>((resolve, reject) => {
141+
// Subscribe to work() to create the first DataSource and start pg-boss
142+
this.#subscription = sharedWork$.subscribe({
143+
error: (error) => {
144+
this.onUnrecoverableError$.next(error);
145+
reject(error);
146+
},
147+
next: () => resolve()
148+
});
149+
});
135150
}
136151

137152
protected async shutdownImpl() {
@@ -174,8 +189,7 @@ export class PgBossHttpService extends HttpService {
174189
}),
175190
catchError((error) => {
176191
this.logger.error('Fatal worker error', error);
177-
// eslint-disable-next-line unicorn/no-process-exit
178-
process.exit(1);
192+
throw error;
179193
})
180194
);
181195
}

packages/cardano-services/src/Program/services/postgres.ts

+26-13
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,31 @@ const mergeTlsOptions = (
109109
}
110110
: ssl || !!conn.ssl;
111111

112+
export const connStringToPgConnectionConfig = (
113+
postgresConnectionString: string,
114+
{
115+
poolSize,
116+
ssl
117+
}: {
118+
poolSize?: number;
119+
ssl?: { ca: string };
120+
} = {}
121+
): PgConnectionConfig => {
122+
const conn = connString.parse(postgresConnectionString);
123+
if (!conn.database || !conn.host) {
124+
throw new InvalidProgramOption('postgresConnectionString');
125+
}
126+
return {
127+
database: conn.database,
128+
host: conn.host,
129+
password: conn.password,
130+
poolSize,
131+
port: conn.port ? Number.parseInt(conn.port) : undefined,
132+
ssl: mergeTlsOptions(conn, ssl),
133+
username: conn.user
134+
};
135+
};
136+
112137
export const getConnectionConfig = <Suffix extends ConnectionNames>(
113138
dnsResolver: DnsResolver,
114139
program: string,
@@ -121,19 +146,7 @@ export const getConnectionConfig = <Suffix extends ConnectionNames>(
121146
const ssl = postgresSslCaFile ? { ca: loadSecret(postgresSslCaFile) } : undefined;
122147

123148
if (postgresConnectionString) {
124-
const conn = connString.parse(postgresConnectionString);
125-
if (!conn.database || !conn.host) {
126-
throw new InvalidProgramOption('postgresConnectionString');
127-
}
128-
return of({
129-
database: conn.database,
130-
host: conn.host,
131-
max,
132-
password: conn.password,
133-
port: conn.port ? Number.parseInt(conn.port) : undefined,
134-
ssl: mergeTlsOptions(conn, ssl),
135-
username: conn.user
136-
});
149+
return of(connStringToPgConnectionConfig(postgresConnectionString, { poolSize: max, ssl }));
137150
}
138151

139152
const postgresDb = getPostgresOption(suffix, 'postgresDb', options);

packages/cardano-services/src/util/TypeormProvider/TypeormProvider.ts

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
import { HealthCheckResponse, Provider } from '@cardano-sdk/core';
2-
import { Logger } from 'ts-log';
3-
import { Observable, skip } from 'rxjs';
4-
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
5-
import { TypeormService } from '../TypeormService';
1+
import { HealthCheckResponse, Milliseconds, Provider } from '@cardano-sdk/core';
2+
import { TypeormService, TypeormServiceDependencies } from '../TypeormService';
3+
import { skip } from 'rxjs';
64

7-
export interface TypeormProviderDependencies {
8-
logger: Logger;
9-
entities: Function[];
10-
connectionConfig$: Observable<PgConnectionConfig>;
11-
}
5+
export type TypeormProviderDependencies = Omit<TypeormServiceDependencies, 'connectionTimeout'>;
126

137
const unhealthy = { ok: false, reason: 'Provider error' };
148

159
export abstract class TypeormProvider extends TypeormService implements Provider {
1610
health: HealthCheckResponse = { ok: false, reason: 'not started' };
1711

18-
constructor(name: string, { connectionConfig$, logger, entities }: TypeormProviderDependencies) {
19-
super(name, { connectionConfig$, entities, logger });
12+
constructor(name: string, dependencies: TypeormProviderDependencies) {
13+
super(name, { ...dependencies, connectionTimeout: Milliseconds(1000) });
2014
// We skip 1 to omit the initial null value of the subject
2115
this.dataSource$.pipe(skip(1)).subscribe((dataSource) => {
2216
this.health = dataSource ? { ok: true } : unhealthy;

packages/cardano-services/src/util/TypeormService/TypeormService.ts

+29-10
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,47 @@
1-
import { BehaviorSubject, Observable, Subscription, filter, firstValueFrom } from 'rxjs';
1+
import { BehaviorSubject, Observable, Subscription, filter, firstValueFrom, timeout } from 'rxjs';
22
import { DataSource, QueryRunner } from 'typeorm';
33
import { Logger } from 'ts-log';
4+
import { Milliseconds } from '@cardano-sdk/core';
45
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
56
import { RunnableModule, isNotNil } from '@cardano-sdk/util';
67
import { createTypeormDataSource } from '../createTypeormDataSource';
78

8-
interface TypeormServiceDependencies {
9+
export interface TypeormServiceDependencies {
910
logger: Logger;
1011
entities: Function[];
1112
connectionConfig$: Observable<PgConnectionConfig>;
13+
connectionTimeout: Milliseconds;
1214
}
1315

1416
export abstract class TypeormService extends RunnableModule {
1517
#entities: Function[];
1618
#connectionConfig$: Observable<PgConnectionConfig>;
1719
protected dataSource$ = new BehaviorSubject<DataSource | null>(null);
1820
#subscription: Subscription | undefined;
21+
#connectionTimeout: Milliseconds;
1922

20-
constructor(name: string, { connectionConfig$, logger, entities }: TypeormServiceDependencies) {
23+
constructor(name: string, { connectionConfig$, logger, entities, connectionTimeout }: TypeormServiceDependencies) {
2124
super(name, logger);
2225
this.#entities = entities;
2326
this.#connectionConfig$ = connectionConfig$;
27+
this.#connectionTimeout = connectionTimeout;
2428
}
2529

26-
#subscribeToDataSource() {
27-
this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger).subscribe(
28-
(dataSource) => this.dataSource$.next(dataSource)
29-
);
30+
async #subscribeToDataSource() {
31+
return new Promise((resolve, reject) => {
32+
this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger).subscribe(
33+
(dataSource) => {
34+
if (dataSource !== this.dataSource$.value) {
35+
this.dataSource$.next(dataSource);
36+
}
37+
if (dataSource) {
38+
resolve(dataSource);
39+
} else {
40+
reject(new Error('Failed to initialize data source'));
41+
}
42+
}
43+
);
44+
});
3045
}
3146

3247
#reset() {
@@ -37,12 +52,16 @@ export abstract class TypeormService extends RunnableModule {
3752

3853
onError(_: unknown) {
3954
this.#reset();
40-
this.#subscribeToDataSource();
55+
void this.#subscribeToDataSource().catch(() => void 0);
4156
}
4257

4358
async withDataSource<T>(callback: (dataSource: DataSource) => Promise<T>): Promise<T> {
59+
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
60+
this.#connectionTimeout;
4461
try {
45-
return await callback(await firstValueFrom(this.dataSource$.pipe(filter(isNotNil))));
62+
return await callback(
63+
await firstValueFrom(this.dataSource$.pipe(filter(isNotNil), timeout({ first: this.#connectionTimeout })))
64+
);
4665
} catch (error) {
4766
this.onError(error);
4867
throw error;
@@ -72,7 +91,7 @@ export abstract class TypeormService extends RunnableModule {
7291
}
7392

7493
async startImpl() {
75-
this.#subscribeToDataSource();
94+
await this.#subscribeToDataSource();
7695
}
7796

7897
async shutdownImpl() {

packages/cardano-services/src/util/createTypeormDataSource.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ export const createTypeormDataSource = (
1212
from(
1313
(async () => {
1414
try {
15-
const dataSource = createDataSource({ connectionConfig, entities, logger });
15+
const dataSource = createDataSource({
16+
connectionConfig,
17+
entities,
18+
logger
19+
});
1620
await dataSource.initialize();
1721
return dataSource;
1822
} catch (error) {

0 commit comments

Comments
 (0)