Skip to content

Commit 172fe30

Browse files
committed
fix: do not run schema migrations when starting pgboss worker
1 parent bcd68fa commit 172fe30

File tree

5 files changed

+75
-26
lines changed

5 files changed

+75
-26
lines changed

packages/cardano-services/src/Program/programs/pgBossWorker.ts

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ export class PgBossWorkerHttpServer extends HttpServer {
3030
const { apiUrl } = cfg;
3131
const { logger } = deps;
3232
const pgBossService = new PgBossHttpService(cfg, deps);
33+
pgBossService.onUnrecoverableError$.subscribe(() => {
34+
// eslint-disable-next-line unicorn/no-process-exit
35+
process.exit(1);
36+
});
3337

3438
super(
3539
{ listen: getListen(apiUrl), name: pgBossWorker },

packages/cardano-services/src/Program/services/pgboss.ts

+23-9
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ import { HttpService } from '../../Http/HttpService';
1919
import { Logger } from 'ts-log';
2020
import {
2121
Observable,
22+
Subject,
2223
Subscription,
2324
catchError,
2425
concat,
2526
finalize,
26-
firstValueFrom,
2727
from,
2828
merge,
2929
share,
@@ -66,9 +66,18 @@ export const createPgBossDataSource = (connectionConfig$: Observable<PgConnectio
6666
const dataSource = createDataSource({
6767
connectionConfig,
6868
entities: pgBossEntities,
69-
logger
69+
extensions: { pgBoss: true },
70+
logger,
71+
options: { migrationsRun: false }
7072
});
7173
await dataSource.initialize();
74+
const pgbossSchema = await dataSource.query(
75+
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgboss'"
76+
);
77+
if (pgbossSchema.length === 0) {
78+
await dataSource.destroy();
79+
throw new Error('Database schema is not ready. Please make sure projector is running.');
80+
}
7281
return dataSource;
7382
})()
7483
)
@@ -105,6 +114,7 @@ export class PgBossHttpService extends HttpService {
105114
#db: Pool;
106115
#subscription?: Subscription;
107116
#health: HealthCheckResponse = { ok: false, reason: 'PgBossHttpService not started' };
117+
onUnrecoverableError$ = new Subject<unknown>();
108118

109119
constructor(cfg: PgBossWorkerArgs, deps: PgBossServiceDependencies) {
110120
const { connectionConfig$, db, logger } = deps;
@@ -127,11 +137,16 @@ export class PgBossHttpService extends HttpService {
127137
// Used for later use of firstValueFrom() to avoid it subscribes again
128138
const sharedWork$ = this.work().pipe(share());
129139

130-
// Subscribe to work() to create the first DataSource and start pg-boss
131-
this.#subscription = sharedWork$.subscribe();
132-
133-
// Used to make startImpl actually await for a first emitted value from work()
134-
await firstValueFrom(sharedWork$);
140+
return new Promise<void>((resolve, reject) => {
141+
// Subscribe to work() to create the first DataSource and start pg-boss
142+
this.#subscription = sharedWork$.subscribe({
143+
error: (error) => {
144+
this.onUnrecoverableError$.next(error);
145+
reject(error);
146+
},
147+
next: () => resolve()
148+
});
149+
});
135150
}
136151

137152
protected async shutdownImpl() {
@@ -174,8 +189,7 @@ export class PgBossHttpService extends HttpService {
174189
}),
175190
catchError((error) => {
176191
this.logger.error('Fatal worker error', error);
177-
// eslint-disable-next-line unicorn/no-process-exit
178-
process.exit(1);
192+
throw error;
179193
})
180194
);
181195
}

packages/cardano-services/test/Program/services/pgboss.test.ts

+39-5
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ jest.mock('@cardano-sdk/projection-typeorm', () => {
7070
});
7171

7272
describe('PgBossHttpService', () => {
73+
const apiUrl = new URL('http://unused/');
7374
let connectionConfig$: Observable<PgConnectionConfig>;
7475
let connectionConfig: PgConnectionConfig;
7576
let dataSource: DataSource;
@@ -98,9 +99,37 @@ describe('PgBossHttpService', () => {
9899
db = pool;
99100
});
100101

101-
afterEach(async () => {
102-
await service?.shutdown();
103-
await dataSource.destroy().catch(() => void 0);
102+
describe('without existing database', () => {
103+
describe('initialize', () => {
104+
it('throws an error and does not initialize pgboss schema', async () => {
105+
service = new PgBossHttpService(
106+
{
107+
apiUrl,
108+
dbCacheTtl: 0,
109+
lastRosEpochs: 10,
110+
metadataFetchMode: StakePoolMetadataFetchMode.DIRECT,
111+
parallelJobs: 3,
112+
queues: [],
113+
schedules: []
114+
},
115+
{ connectionConfig$, db, logger }
116+
);
117+
await expect(async () => {
118+
await service!.initialize();
119+
await service!.start();
120+
}).rejects.toThrowError();
121+
const pool = new Pool({
122+
// most of the props are the same as for typeorm
123+
...connectionConfig,
124+
ssl: undefined,
125+
user: connectionConfig.username
126+
});
127+
const pgbossSchema = await pool.query(
128+
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgboss'"
129+
);
130+
expect(pgbossSchema.rowCount).toBe(0);
131+
});
132+
});
104133
});
105134

106135
describe('with existing database', () => {
@@ -115,10 +144,15 @@ describe('PgBossHttpService', () => {
115144
await dataSource.initialize();
116145
});
117146

147+
afterEach(async () => {
148+
await service?.shutdown();
149+
await dataSource.destroy().catch(() => void 0);
150+
});
151+
118152
it('health check is ok after start with a valid db connection', async () => {
119153
service = new PgBossHttpService(
120154
{
121-
apiUrl: new URL('http://unused/'),
155+
apiUrl,
122156
dbCacheTtl: 0,
123157
lastRosEpochs: 10,
124158
metadataFetchMode: StakePoolMetadataFetchMode.DIRECT,
@@ -152,7 +186,7 @@ describe('PgBossHttpService', () => {
152186

153187
service = new PgBossHttpService(
154188
{
155-
apiUrl: new URL('http://unused/'),
189+
apiUrl,
156190
dbCacheTtl: 0,
157191
lastRosEpochs: 10,
158192
metadataFetchMode: StakePoolMetadataFetchMode.DIRECT,

packages/projection-typeorm/src/createDataSource.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,14 @@ export const createDataSource = ({
149149
return patchObject(dataSource, {
150150
async initialize() {
151151
await dataSource.initialize();
152-
await initializePgBoss(
153-
dataSource,
154-
contextLogger(logger, 'createDataSource'),
155-
extensions?.pgBoss,
156-
devOptions?.dropSchema
157-
);
152+
if (extensions?.pgBoss && (options?.migrationsRun || devOptions?.synchronize)) {
153+
await initializePgBoss(
154+
dataSource,
155+
contextLogger(logger, 'createDataSource'),
156+
extensions?.pgBoss,
157+
devOptions?.dropSchema
158+
);
159+
}
158160
return dataSource;
159161
}
160162
});

packages/projection-typeorm/test/createDataSource.test.ts

+1-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ describe('createDataSource', () => {
1616
});
1717

1818
describe('pg-boss schema', () => {
19-
it('initialize() creates and drops pg-boss schema when pgBoss extension is enabled', async () => {
19+
it('initialize() creates pg-boss schema when pgBoss extension is enabled', async () => {
2020
const dataSourceWithBoss = await initializeDataSource({
2121
entities: [BlockEntity],
2222
extensions: {
@@ -26,11 +26,6 @@ describe('createDataSource', () => {
2626
const queryRunnerWithBoss = dataSourceWithBoss.createQueryRunner();
2727
expect(await pgBossSchemaExists(queryRunnerWithBoss)).toBe(true);
2828
await queryRunnerWithBoss.release();
29-
30-
const dataSourceWithoutBoss = await initializeDataSource({ entities: [] });
31-
const queryRunnerWithoutBoss = dataSourceWithoutBoss.createQueryRunner();
32-
expect(await pgBossSchemaExists(queryRunnerWithoutBoss)).toBe(false);
33-
await queryRunnerWithBoss.release();
3429
});
3530
});
3631
});

0 commit comments

Comments
 (0)