diff --git a/Dockerfile b/Dockerfile index 94ba7a4ed06..025acab0c7a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -61,7 +61,7 @@ CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --con FROM cardano-services as worker WORKDIR /app/packages/cardano-services -CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --conditions=development src/cli start-worker"] +CMD ["bash", "-c", "../../node_modules/.bin/tsx watch --clear-screen=false --conditions=development src/cli start-pg-boss-worker"] FROM cardano-services as blockfrost-worker ENV \ diff --git a/packages/cardano-services/.env.test b/packages/cardano-services/.env.test index ca2d5d0abd7..d3b1588c771 100644 --- a/packages/cardano-services/.env.test +++ b/packages/cardano-services/.env.test @@ -3,6 +3,7 @@ POSTGRES_CONNECTION_STRING_HANDLE=postgresql://postgres:doNoUseThisSecret!@127.0 POSTGRES_CONNECTION_STRING_PROJECTION=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/projection POSTGRES_CONNECTION_STRING_STAKE_POOL=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/stake_pool POSTGRES_CONNECTION_STRING_ASSET=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/asset +POSTGRES_CONNECTION_STRING_EMPTY=postgresql://postgres:doNoUseThisSecret!@127.0.0.1:5433/empty CARDANO_NODE_CONFIG_PATH=./config/network/mainnet/cardano-node/config.json DB_CACHE_TTL=120 EPOCH_POLL_INTERVAL=10000 diff --git a/packages/cardano-services/README.md b/packages/cardano-services/README.md index 515425f5ba1..a9d77afb73c 100644 --- a/packages/cardano-services/README.md +++ b/packages/cardano-services/README.md @@ -99,19 +99,16 @@ OGMIOS_SRV_SERVICE_NAME=some-domain-for-ogmios \ ./dist/cjs/cli.js start-provider-server ``` -**`start-worker` using CLI options:** +**`start-pg-boss-worker` using CLI options:** ```bash -./dist/cjs/cli.js \ - start-worker \ - --ogmios-srv-service-name some-domain-for-ogmios +./dist/cjs/cli.js start-pg-boss-worker --queues=pool-metadata --postgres-connection-string-stake-pool "postgresql://postgres:doNoUseThisSecret\!@localhost:5432/projection" --postgres-connection-string-db-sync "postgresql://postgres:doNoUseThisSecret\!@localhost:5432/cexplorer" ``` -**`start-worker` using env variables:** +**`start-pg-boss-worker` using env variables:** ```bash -OGMIOS_SRV_SERVICE_NAME=some-domain-for-ogmios \ -./dist/cjs/cli.js start-worker +QUEUES=pool-metadata POSTGRES_CONNECTION_STRING_STAKE_POOL=postgresql://postgres:doNoUseThisSecret\!@localhost:5432/projection POSTGRES_CONNECTION_STRING_DB_SYNC=postgresql://postgres:doNoUseThisSecret\!@localhost:5432/cexplorer ./dist/cjs/cli.js start-pg-boss-worker ``` **`start-projector` using CLI options with Ogmios and PostgreSQL running on localhost:** diff --git a/packages/cardano-services/package.json b/packages/cardano-services/package.json index db8f2fce1fb..0f17c33f624 100644 --- a/packages/cardano-services/package.json +++ b/packages/cardano-services/package.json @@ -86,6 +86,7 @@ "npm-run-all": "^4.1.5", "ts-jest": "^28.0.7", "ts-node": "^10.0.0", + "typeorm-extension": "^2.7.0", "typescript": "^4.7.4", "wait-on": "^6.0.1" }, diff --git a/packages/cardano-services/src/Asset/TypeOrmNftMetadataService.ts b/packages/cardano-services/src/Asset/TypeOrmNftMetadataService.ts index 81034773e5f..62c47362539 100644 --- a/packages/cardano-services/src/Asset/TypeOrmNftMetadataService.ts +++ b/packages/cardano-services/src/Asset/TypeOrmNftMetadataService.ts @@ -1,4 +1,4 @@ -import { Asset, Cardano } from '@cardano-sdk/core'; +import { Asset, Cardano, Milliseconds } from '@cardano-sdk/core'; import { AssetPolicyIdAndName, NftMetadataService } from './types'; import { NftMetadataEntity } from '@cardano-sdk/projection-typeorm'; import { QueryRunner } from 'typeorm'; @@ -6,7 +6,7 @@ import { TypeormProviderDependencies, TypeormService } from '../util'; export class TypeOrmNftMetadataService extends TypeormService implements NftMetadataService { constructor({ connectionConfig$, logger, entities }: TypeormProviderDependencies) { - super('TypeOrmNftMetadataService', { connectionConfig$, entities, logger }); + super('TypeOrmNftMetadataService', { connectionConfig$, connectionTimeout: Milliseconds(1000), entities, logger }); } async getNftMetadata(assetInfo: AssetPolicyIdAndName): Promise { diff --git a/packages/cardano-services/src/Program/programs/pgBossWorker.ts b/packages/cardano-services/src/Program/programs/pgBossWorker.ts index c2dbd036000..a3ffef4c021 100644 --- a/packages/cardano-services/src/Program/programs/pgBossWorker.ts +++ b/packages/cardano-services/src/Program/programs/pgBossWorker.ts @@ -30,6 +30,10 @@ export class PgBossWorkerHttpServer extends HttpServer { const { apiUrl } = cfg; const { logger } = deps; const pgBossService = new PgBossHttpService(cfg, deps); + pgBossService.onUnrecoverableError$.subscribe(() => { + // eslint-disable-next-line unicorn/no-process-exit + process.exit(1); + }); super( { listen: getListen(apiUrl), name: pgBossWorker }, diff --git a/packages/cardano-services/src/Program/services/pgboss.ts b/packages/cardano-services/src/Program/services/pgboss.ts index 0e5eaf89ffb..a1fcdf834ce 100644 --- a/packages/cardano-services/src/Program/services/pgboss.ts +++ b/packages/cardano-services/src/Program/services/pgboss.ts @@ -19,11 +19,11 @@ import { HttpService } from '../../Http/HttpService'; import { Logger } from 'ts-log'; import { Observable, + Subject, Subscription, catchError, concat, finalize, - firstValueFrom, from, merge, share, @@ -66,9 +66,18 @@ export const createPgBossDataSource = (connectionConfig$: Observable(); constructor(cfg: PgBossWorkerArgs, deps: PgBossServiceDependencies) { const { connectionConfig$, db, logger } = deps; @@ -127,11 +137,16 @@ export class PgBossHttpService extends HttpService { // Used for later use of firstValueFrom() to avoid it subscribes again const sharedWork$ = this.work().pipe(share()); - // Subscribe to work() to create the first DataSource and start pg-boss - this.#subscription = sharedWork$.subscribe(); - - // Used to make startImpl actually await for a first emitted value from work() - await firstValueFrom(sharedWork$); + return new Promise((resolve, reject) => { + // Subscribe to work() to create the first DataSource and start pg-boss + this.#subscription = sharedWork$.subscribe({ + error: (error) => { + this.onUnrecoverableError$.next(error); + reject(error); + }, + next: () => resolve() + }); + }); } protected async shutdownImpl() { @@ -174,8 +189,7 @@ export class PgBossHttpService extends HttpService { }), catchError((error) => { this.logger.error('Fatal worker error', error); - // eslint-disable-next-line unicorn/no-process-exit - process.exit(1); + throw error; }) ); } diff --git a/packages/cardano-services/src/Program/services/postgres.ts b/packages/cardano-services/src/Program/services/postgres.ts index f2557779a7f..a5c285b696a 100644 --- a/packages/cardano-services/src/Program/services/postgres.ts +++ b/packages/cardano-services/src/Program/services/postgres.ts @@ -109,6 +109,31 @@ const mergeTlsOptions = ( } : ssl || !!conn.ssl; +export const connStringToPgConnectionConfig = ( + postgresConnectionString: string, + { + poolSize, + ssl + }: { + poolSize?: number; + ssl?: { ca: string }; + } = {} +): PgConnectionConfig => { + const conn = connString.parse(postgresConnectionString); + if (!conn.database || !conn.host) { + throw new InvalidProgramOption('postgresConnectionString'); + } + return { + database: conn.database, + host: conn.host, + password: conn.password, + poolSize, + port: conn.port ? Number.parseInt(conn.port) : undefined, + ssl: mergeTlsOptions(conn, ssl), + username: conn.user + }; +}; + export const getConnectionConfig = ( dnsResolver: DnsResolver, program: string, @@ -121,19 +146,7 @@ export const getConnectionConfig = ( const ssl = postgresSslCaFile ? { ca: loadSecret(postgresSslCaFile) } : undefined; if (postgresConnectionString) { - const conn = connString.parse(postgresConnectionString); - if (!conn.database || !conn.host) { - throw new InvalidProgramOption('postgresConnectionString'); - } - return of({ - database: conn.database, - host: conn.host, - max, - password: conn.password, - port: conn.port ? Number.parseInt(conn.port) : undefined, - ssl: mergeTlsOptions(conn, ssl), - username: conn.user - }); + return of(connStringToPgConnectionConfig(postgresConnectionString, { poolSize: max, ssl })); } const postgresDb = getPostgresOption(suffix, 'postgresDb', options); diff --git a/packages/cardano-services/src/util/TypeormProvider/TypeormProvider.ts b/packages/cardano-services/src/util/TypeormProvider/TypeormProvider.ts index 52a5b2d85f3..647c12eec90 100644 --- a/packages/cardano-services/src/util/TypeormProvider/TypeormProvider.ts +++ b/packages/cardano-services/src/util/TypeormProvider/TypeormProvider.ts @@ -1,22 +1,16 @@ -import { HealthCheckResponse, Provider } from '@cardano-sdk/core'; -import { Logger } from 'ts-log'; -import { Observable, skip } from 'rxjs'; -import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm'; -import { TypeormService } from '../TypeormService'; +import { HealthCheckResponse, Milliseconds, Provider } from '@cardano-sdk/core'; +import { TypeormService, TypeormServiceDependencies } from '../TypeormService'; +import { skip } from 'rxjs'; -export interface TypeormProviderDependencies { - logger: Logger; - entities: Function[]; - connectionConfig$: Observable; -} +export type TypeormProviderDependencies = Omit; const unhealthy = { ok: false, reason: 'Provider error' }; export abstract class TypeormProvider extends TypeormService implements Provider { health: HealthCheckResponse = { ok: false, reason: 'not started' }; - constructor(name: string, { connectionConfig$, logger, entities }: TypeormProviderDependencies) { - super(name, { connectionConfig$, entities, logger }); + constructor(name: string, dependencies: TypeormProviderDependencies) { + super(name, { ...dependencies, connectionTimeout: Milliseconds(1000) }); // We skip 1 to omit the initial null value of the subject this.dataSource$.pipe(skip(1)).subscribe((dataSource) => { this.health = dataSource ? { ok: true } : unhealthy; diff --git a/packages/cardano-services/src/util/TypeormService/TypeormService.ts b/packages/cardano-services/src/util/TypeormService/TypeormService.ts index 9359287e7e9..3ba6caa64b8 100644 --- a/packages/cardano-services/src/util/TypeormService/TypeormService.ts +++ b/packages/cardano-services/src/util/TypeormService/TypeormService.ts @@ -1,14 +1,16 @@ -import { BehaviorSubject, Observable, Subscription, filter, firstValueFrom } from 'rxjs'; +import { BehaviorSubject, Observable, Subscription, filter, firstValueFrom, timeout } from 'rxjs'; import { DataSource, QueryRunner } from 'typeorm'; import { Logger } from 'ts-log'; +import { Milliseconds } from '@cardano-sdk/core'; import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm'; import { RunnableModule, isNotNil } from '@cardano-sdk/util'; import { createTypeormDataSource } from '../createTypeormDataSource'; -interface TypeormServiceDependencies { +export interface TypeormServiceDependencies { logger: Logger; entities: Function[]; connectionConfig$: Observable; + connectionTimeout: Milliseconds; } export abstract class TypeormService extends RunnableModule { @@ -16,17 +18,30 @@ export abstract class TypeormService extends RunnableModule { #connectionConfig$: Observable; protected dataSource$ = new BehaviorSubject(null); #subscription: Subscription | undefined; + #connectionTimeout: Milliseconds; - constructor(name: string, { connectionConfig$, logger, entities }: TypeormServiceDependencies) { + constructor(name: string, { connectionConfig$, logger, entities, connectionTimeout }: TypeormServiceDependencies) { super(name, logger); this.#entities = entities; this.#connectionConfig$ = connectionConfig$; + this.#connectionTimeout = connectionTimeout; } - #subscribeToDataSource() { - this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger).subscribe( - (dataSource) => this.dataSource$.next(dataSource) - ); + async #subscribeToDataSource() { + return new Promise((resolve, reject) => { + this.#subscription = createTypeormDataSource(this.#connectionConfig$, this.#entities, this.logger).subscribe( + (dataSource) => { + if (dataSource !== this.dataSource$.value) { + this.dataSource$.next(dataSource); + } + if (dataSource) { + resolve(dataSource); + } else { + reject(new Error('Failed to initialize data source')); + } + } + ); + }); } #reset() { @@ -37,12 +52,16 @@ export abstract class TypeormService extends RunnableModule { onError(_: unknown) { this.#reset(); - this.#subscribeToDataSource(); + void this.#subscribeToDataSource().catch(() => void 0); } async withDataSource(callback: (dataSource: DataSource) => Promise): Promise { + // eslint-disable-next-line @typescript-eslint/no-unused-expressions + this.#connectionTimeout; try { - return await callback(await firstValueFrom(this.dataSource$.pipe(filter(isNotNil)))); + return await callback( + await firstValueFrom(this.dataSource$.pipe(filter(isNotNil), timeout({ first: this.#connectionTimeout }))) + ); } catch (error) { this.onError(error); throw error; @@ -72,7 +91,7 @@ export abstract class TypeormService extends RunnableModule { } async startImpl() { - this.#subscribeToDataSource(); + await this.#subscribeToDataSource(); } async shutdownImpl() { diff --git a/packages/cardano-services/src/util/createTypeormDataSource.ts b/packages/cardano-services/src/util/createTypeormDataSource.ts index e50ec3701ce..7caede297f3 100644 --- a/packages/cardano-services/src/util/createTypeormDataSource.ts +++ b/packages/cardano-services/src/util/createTypeormDataSource.ts @@ -12,7 +12,11 @@ export const createTypeormDataSource = ( from( (async () => { try { - const dataSource = createDataSource({ connectionConfig, entities, logger }); + const dataSource = createDataSource({ + connectionConfig, + entities, + logger + }); await dataSource.initialize(); return dataSource; } catch (error) { diff --git a/packages/cardano-services/test/Program/services/pgboss.test.ts b/packages/cardano-services/test/Program/services/pgboss.test.ts index e6de6cd3aad..08835373898 100644 --- a/packages/cardano-services/test/Program/services/pgboss.test.ts +++ b/packages/cardano-services/test/Program/services/pgboss.test.ts @@ -70,6 +70,7 @@ jest.mock('@cardano-sdk/projection-typeorm', () => { }); describe('PgBossHttpService', () => { + const apiUrl = new URL('http://unused/'); let connectionConfig$: Observable; let connectionConfig: PgConnectionConfig; let dataSource: DataSource; @@ -98,145 +99,180 @@ describe('PgBossHttpService', () => { db = pool; }); - beforeEach(async () => { - dataSource = createDataSource({ - connectionConfig, - devOptions: { dropSchema: true, synchronize: true }, - entities: pgBossEntities, - extensions: { pgBoss: true }, - logger + describe('without existing database', () => { + describe('initialize', () => { + it('throws an error and does not initialize pgboss schema', async () => { + service = new PgBossHttpService( + { + apiUrl, + dbCacheTtl: 0, + lastRosEpochs: 10, + metadataFetchMode: StakePoolMetadataFetchMode.DIRECT, + parallelJobs: 3, + queues: [], + schedules: [] + }, + { connectionConfig$, db, logger } + ); + await expect(async () => { + await service!.initialize(); + await service!.start(); + }).rejects.toThrowError(); + const pool = new Pool({ + // most of the props are the same as for typeorm + ...connectionConfig, + ssl: undefined, + user: connectionConfig.username + }); + const pgbossSchema = await pool.query( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgboss'" + ); + expect(pgbossSchema.rowCount).toBe(0); + }); }); - await dataSource.initialize(); }); - afterEach(async () => { - await service?.shutdown(); - await dataSource.destroy().catch(() => void 0); - }); + describe('with existing database', () => { + beforeEach(async () => { + dataSource = createDataSource({ + connectionConfig, + devOptions: { dropSchema: true, synchronize: true }, + entities: pgBossEntities, + extensions: { pgBoss: true }, + logger + }); + await dataSource.initialize(); + }); - it('health check is ok after start with a valid db connection', async () => { - service = new PgBossHttpService( - { - apiUrl: new URL('http://unused/'), - dbCacheTtl: 0, - lastRosEpochs: 10, - metadataFetchMode: StakePoolMetadataFetchMode.DIRECT, - parallelJobs: 3, - queues: [], - schedules: [] - }, - { connectionConfig$, db, logger } - ); - expect(await service.healthCheck()).toEqual({ ok: false, reason: 'PgBossHttpService not started' }); - await service.initialize(); - await service.start(); - expect(await service.healthCheck()).toEqual({ ok: true }); - }); + afterEach(async () => { + await service?.shutdown(); + await dataSource.destroy().catch(() => void 0); + }); + + it('health check is ok after start with a valid db connection', async () => { + service = new PgBossHttpService( + { + apiUrl, + dbCacheTtl: 0, + lastRosEpochs: 10, + metadataFetchMode: StakePoolMetadataFetchMode.DIRECT, + parallelJobs: 3, + queues: [], + schedules: [] + }, + { connectionConfig$, db, logger } + ); + expect(await service.healthCheck()).toEqual({ ok: false, reason: 'PgBossHttpService not started' }); + await service.initialize(); + await service.start(); + expect(await service.healthCheck()).toEqual({ ok: true }); + }); - // eslint-disable-next-line max-statements - it('retries a job until done, eventually reconnecting to the db', async () => { - let observablePromise = Promise.resolve(); - // eslint-disable-next-line @typescript-eslint/no-empty-function, unicorn/consistent-function-scoping - let observableResolver = () => {}; - let subscriptions = 0; + // eslint-disable-next-line max-statements + it('retries a job until done, eventually reconnecting to the db', async () => { + let observablePromise = Promise.resolve(); + // eslint-disable-next-line @typescript-eslint/no-empty-function, unicorn/consistent-function-scoping + let observableResolver = () => {}; + let subscriptions = 0; - const config$ = new Observable((subscriber) => { - subscriptions++; + const config$ = new Observable((subscriber) => { + subscriptions++; - void (async () => { - await observablePromise; - subscriber.next(connectionConfig); - })(); - }); + void (async () => { + await observablePromise; + subscriber.next(connectionConfig); + })(); + }); - service = new PgBossHttpService( - { - apiUrl: new URL('http://unused/'), - dbCacheTtl: 0, - lastRosEpochs: 10, - metadataFetchMode: StakePoolMetadataFetchMode.DIRECT, - parallelJobs: 3, - queues: [STAKE_POOL_METADATA_QUEUE], - schedules: [] - }, - { connectionConfig$: config$, db, logger } - ); - await service.initialize(); - await service.start(); - - // Insert test block with slot 1 - const queryRunner = dataSource.createQueryRunner(); - await queryRunner.connect(); - const blockRepos = dataSource.getRepository(BlockEntity); - const block = { hash: 'test', height: 1, slot: 1 }; - await blockRepos.insert(block); - - // Helper to check all the status at each step - const collectStatus = async () => ({ calls: callCounter, health: await service?.healthCheck(), subscriptions }); - - expect(await collectStatus()).toEqual({ calls: 0, health: { ok: true }, subscriptions: 1 }); - - // Schedule a job - const pgboss = createPgBossExtension(queryRunner, logger); - await pgboss.send(STAKE_POOL_METADATA_QUEUE, {}, { retryDelay: 1, retryLimit: 100, slot: Cardano.Slot(1) }); - await queryRunner.release(); - - expect(await collectStatus()).toEqual({ calls: 0, health: { ok: true }, subscriptions: 1 }); - - // Let the handler to throw an error which pg-boss will retry - handlerResolvers[0](); - await testPromises[0]; - - expect(await collectStatus()).toEqual({ calls: 1, health: { ok: true }, subscriptions: 1 }); - - // Prepare the DB configuration observable to wait for the command in order to provide a new connection config - observablePromise = new Promise((resolve) => (observableResolver = resolve)); - - // Let the handler to throw a mocked DB error which should cause a DB reconnection - handlerResolvers[1](); - await testPromises[1]; - - expect(await collectStatus()).toEqual({ - calls: 2, - health: { ok: false, reason: 'DataBase error: reconnecting...' }, - subscriptions: 2 - }); + service = new PgBossHttpService( + { + apiUrl, + dbCacheTtl: 0, + lastRosEpochs: 10, + metadataFetchMode: StakePoolMetadataFetchMode.DIRECT, + parallelJobs: 3, + queues: [STAKE_POOL_METADATA_QUEUE], + schedules: [] + }, + { connectionConfig$: config$, db, logger } + ); + await service.initialize(); + await service.start(); - // Emit a new DB configuration to make PgBossHttpService reconnect - observableResolver(); + // Insert test block with slot 1 + const queryRunner = dataSource.createQueryRunner(); + await queryRunner.connect(); + const blockRepos = dataSource.getRepository(BlockEntity); + const block = { hash: 'test', height: 1, slot: 1 }; + await blockRepos.insert(block); - // Wait until PgBossHttpService reconnected - while (!(await service?.healthCheck())?.ok) await new Promise((resolve) => setTimeout(resolve, 5)); + // Helper to check all the status at each step + const collectStatus = async () => ({ calls: callCounter, health: await service?.healthCheck(), subscriptions }); - expect(await collectStatus()).toEqual({ calls: 2, health: { ok: true }, subscriptions: 2 }); + expect(await collectStatus()).toEqual({ calls: 0, health: { ok: true }, subscriptions: 1 }); - // Prepare the DB configuration observable to wait for the command in order to provide a new connection config - observablePromise = new Promise((resolve) => (observableResolver = resolve)); + // Schedule a job + const pgboss = createPgBossExtension(queryRunner, logger); + await pgboss.send(STAKE_POOL_METADATA_QUEUE, {}, { retryDelay: 1, retryLimit: 100, slot: Cardano.Slot(1) }); + await queryRunner.release(); - // Let the handler to throw an error simulating pgboss locked state which should cause a DB reconnection - handlerResolvers[2](); - await testPromises[2]; + expect(await collectStatus()).toEqual({ calls: 0, health: { ok: true }, subscriptions: 1 }); - expect(await collectStatus()).toEqual({ - calls: 3, - health: { ok: false, reason: 'DataBase error: reconnecting...' }, - subscriptions: 3 - }); + // Let the handler to throw an error which pg-boss will retry + handlerResolvers[0](); + await testPromises[0]; + + expect(await collectStatus()).toEqual({ calls: 1, health: { ok: true }, subscriptions: 1 }); - // Emit a new DB configuration to make PgBossHttpService reconnect - observableResolver(); + // Prepare the DB configuration observable to wait for the command in order to provide a new connection config + observablePromise = new Promise((resolve) => (observableResolver = resolve)); - // Wait until PgBossHttpService reconnected - while (!(await service?.healthCheck())?.ok) await new Promise((resolve) => setTimeout(resolve, 5)); + // Let the handler to throw a mocked DB error which should cause a DB reconnection + handlerResolvers[1](); + await testPromises[1]; - expect(await collectStatus()).toEqual({ calls: 3, health: { ok: true }, subscriptions: 3 }); + expect(await collectStatus()).toEqual({ + calls: 2, + health: { ok: false, reason: 'DataBase error: reconnecting...' }, + subscriptions: 2 + }); - // Let the handler to complete with success - handlerResolvers[3](); - await testPromises[3]; + // Emit a new DB configuration to make PgBossHttpService reconnect + observableResolver(); - expect(await collectStatus()).toEqual({ calls: 4, health: { ok: true }, subscriptions: 3 }); + // Wait until PgBossHttpService reconnected + while (!(await service?.healthCheck())?.ok) await new Promise((resolve) => setTimeout(resolve, 5)); - await dataSource.destroy(); + expect(await collectStatus()).toEqual({ calls: 2, health: { ok: true }, subscriptions: 2 }); + + // Prepare the DB configuration observable to wait for the command in order to provide a new connection config + observablePromise = new Promise((resolve) => (observableResolver = resolve)); + + // Let the handler to throw an error simulating pgboss locked state which should cause a DB reconnection + handlerResolvers[2](); + await testPromises[2]; + + expect(await collectStatus()).toEqual({ + calls: 3, + health: { ok: false, reason: 'DataBase error: reconnecting...' }, + subscriptions: 3 + }); + + // Emit a new DB configuration to make PgBossHttpService reconnect + observableResolver(); + + // Wait until PgBossHttpService reconnected + while (!(await service?.healthCheck())?.ok) await new Promise((resolve) => setTimeout(resolve, 5)); + + expect(await collectStatus()).toEqual({ calls: 3, health: { ok: true }, subscriptions: 3 }); + + // Let the handler to complete with success + handlerResolvers[3](); + await testPromises[3]; + + expect(await collectStatus()).toEqual({ calls: 4, health: { ok: true }, subscriptions: 3 }); + + await dataSource.destroy(); + }); }); }); diff --git a/packages/cardano-services/test/util/TypeormService.test.ts b/packages/cardano-services/test/util/TypeormService.test.ts new file mode 100644 index 00000000000..fd72e8dc5a2 --- /dev/null +++ b/packages/cardano-services/test/util/TypeormService.test.ts @@ -0,0 +1,69 @@ +import { BehaviorSubject, of } from 'rxjs'; +import { BlockEntity, PgConnectionConfig } from '@cardano-sdk/projection-typeorm'; +import { Milliseconds } from '@cardano-sdk/core'; +import { TypeormService } from '../../src/util'; +import { connStringToPgConnectionConfig } from '../../src'; +import { createDatabase } from 'typeorm-extension'; +import { dummyLogger as logger } from 'ts-log'; + +class TestTypeormService extends TypeormService {} + +describe('TypeormService', () => { + const badConnectionConfig: PgConnectionConfig = { port: 1234 }; + const connectionTimeout = Milliseconds(10); + + it('fails to start when it cannot connect to db', async () => { + const service = new TestTypeormService('test', { + connectionConfig$: of(badConnectionConfig), + connectionTimeout, + entities: [], + logger + }); + await service.initialize(); + await expect(service.start()).rejects.toThrowError(); + }); + + describe('started', () => { + const goodConnectionConfig = connStringToPgConnectionConfig(process.env.POSTGRES_CONNECTION_STRING_EMPTY!); + let connectionConfig$: BehaviorSubject; + let service: TestTypeormService; + + beforeAll(async () => { + await createDatabase({ options: { ...goodConnectionConfig, type: 'postgres' } }); + }); + + beforeEach(async () => { + connectionConfig$ = new BehaviorSubject(goodConnectionConfig); + service = new TestTypeormService('test', { + connectionConfig$, + connectionTimeout, + entities: [BlockEntity], + logger + }); + await service.initialize(); + await service.start(); + }); + + it('does not create the schema', async () => { + await expect(service.withQueryRunner((queryRunner) => queryRunner.hasTable('block'))).resolves.toBe(false); + }); + + it('reconnects on error', async () => { + connectionConfig$.next(badConnectionConfig); + service.onError(new Error('Any error')); + const queryResultReady = service.withQueryRunner(async () => 'ok'); + connectionConfig$.next(goodConnectionConfig); + await expect(queryResultReady).resolves.toBe('ok'); + }); + + it('times out when it cannot reconnect for too long, then recovers', async () => { + connectionConfig$.next(badConnectionConfig); + service.onError(new Error('Any error')); + const queryFailureReady = service.withQueryRunner(async () => 'ok'); + await expect(queryFailureReady).rejects.toThrowError(); + const querySuccessReady = service.withQueryRunner(async () => 'ok'); + connectionConfig$.next(goodConnectionConfig); + await expect(querySuccessReady).resolves.toBe('ok'); + }); + }); +}); diff --git a/packages/projection-typeorm/src/createDataSource.ts b/packages/projection-typeorm/src/createDataSource.ts index 21e9e8a4d9a..857c4936d34 100644 --- a/packages/projection-typeorm/src/createDataSource.ts +++ b/packages/projection-typeorm/src/createDataSource.ts @@ -16,7 +16,7 @@ type PostgresConnectionOptions = DataSourceOptions & { type: 'postgres' }; export type PgConnectionConfig = Pick< PostgresConnectionOptions, - 'host' | 'port' | 'database' | 'username' | 'password' | 'ssl' + 'host' | 'port' | 'database' | 'username' | 'password' | 'ssl' | 'poolSize' >; export type TypeormDevOptions = Pick; @@ -149,12 +149,14 @@ export const createDataSource = ({ return patchObject(dataSource, { async initialize() { await dataSource.initialize(); - await initializePgBoss( - dataSource, - contextLogger(logger, 'createDataSource'), - extensions?.pgBoss, - devOptions?.dropSchema - ); + if (extensions?.pgBoss && (options?.migrationsRun || devOptions?.synchronize)) { + await initializePgBoss( + dataSource, + contextLogger(logger, 'createDataSource'), + extensions?.pgBoss, + devOptions?.dropSchema + ); + } return dataSource; } }); diff --git a/packages/projection-typeorm/test/createDataSource.test.ts b/packages/projection-typeorm/test/createDataSource.test.ts index 18c90cc8649..bf75516b3e0 100644 --- a/packages/projection-typeorm/test/createDataSource.test.ts +++ b/packages/projection-typeorm/test/createDataSource.test.ts @@ -16,7 +16,7 @@ describe('createDataSource', () => { }); describe('pg-boss schema', () => { - it('initialize() creates and drops pg-boss schema when pgBoss extension is enabled', async () => { + it('initialize() creates pg-boss schema when pgBoss extension is enabled', async () => { const dataSourceWithBoss = await initializeDataSource({ entities: [BlockEntity], extensions: { @@ -26,11 +26,6 @@ describe('createDataSource', () => { const queryRunnerWithBoss = dataSourceWithBoss.createQueryRunner(); expect(await pgBossSchemaExists(queryRunnerWithBoss)).toBe(true); await queryRunnerWithBoss.release(); - - const dataSourceWithoutBoss = await initializeDataSource({ entities: [] }); - const queryRunnerWithoutBoss = dataSourceWithoutBoss.createQueryRunner(); - expect(await pgBossSchemaExists(queryRunnerWithoutBoss)).toBe(false); - await queryRunnerWithBoss.release(); }); }); }); diff --git a/yarn.lock b/yarn.lock index 9c04b991fe7..83cd3f3ac01 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3188,6 +3188,7 @@ __metadata: ts-log: ^2.2.4 ts-node: ^10.0.0 typeorm: ^0.3.15 + typeorm-extension: ^2.7.0 typescript: ^4.7.4 uuid: ^10.0.0 wait-on: ^6.0.1