diff --git a/packages/cardano-services/src/Program/programs/providerServer.ts b/packages/cardano-services/src/Program/programs/providerServer.ts index 0cef5414621..9e95a7cc260 100644 --- a/packages/cardano-services/src/Program/programs/providerServer.ts +++ b/packages/cardano-services/src/Program/programs/providerServer.ts @@ -24,18 +24,18 @@ import { InMemoryCache, NoCache } from '../../InMemoryCache'; import { KoraLabsHandleProvider } from '@cardano-sdk/cardano-services-client'; import { Logger } from 'ts-log'; import { MissingProgramOption, MissingServiceDependency, RunnableDependencies, UnknownServiceName } from '../errors'; +import { NodeTxSubmitProvider, TxSubmitHttpService } from '../../TxSubmit'; import { Observable } from 'rxjs'; import { OgmiosCardanoNode } from '@cardano-sdk/ogmios'; import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm'; import { Pool } from 'pg'; import { ProviderServerArgs, ProviderServerOptionDescriptions, ServiceNames } from './types'; import { SrvRecord } from 'dns'; -import { TxSubmitHttpService } from '../../TxSubmit'; import { TypeormAssetProvider } from '../../Asset/TypeormAssetProvider'; import { TypeormStakePoolProvider } from '../../StakePool/TypeormStakePoolProvider/TypeormStakePoolProvider'; import { createDbSyncMetadataService } from '../../Metadata'; import { createLogger } from 'bunyan'; -import { getConnectionConfig, getOgmiosTxSubmitProvider, getRabbitMqTxSubmitProvider } from '../services'; +import { getConnectionConfig, getOgmiosObservableCardanoNode, getRabbitMqTxSubmitProvider } from '../services'; import { getEntities } from '../../Projection/prepareTypeormProjection'; import { isNotNil } from '@cardano-sdk/util'; import memoize from 'lodash/memoize'; @@ -306,7 +306,11 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => { [ServiceNames.TxSubmit]: async () => { const txSubmitProvider = args.useQueue ? await getRabbitMqTxSubmitProvider(dnsResolver, logger, args) - : await getOgmiosTxSubmitProvider(dnsResolver, logger, args, await getHandleProvider()); + : new NodeTxSubmitProvider({ + cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args), + handleProvider: await getHandleProvider(), + logger + }); return new TxSubmitHttpService({ logger, txSubmitProvider }); } }; diff --git a/packages/cardano-services/src/Program/programs/txWorker.ts b/packages/cardano-services/src/Program/programs/txWorker.ts index 51567a3fdbb..18bb157d961 100644 --- a/packages/cardano-services/src/Program/programs/txWorker.ts +++ b/packages/cardano-services/src/Program/programs/txWorker.ts @@ -1,9 +1,9 @@ import { CommonProgramOptions, OgmiosProgramOptions, RabbitMqProgramOptions } from '../options'; import { Logger } from 'ts-log'; -import { TxSubmitWorkerConfig } from '../../TxSubmit'; +import { NodeTxSubmitProvider, TxSubmitWorkerConfig } from '../../TxSubmit'; import { createDnsResolver } from '../utils'; import { createLogger } from 'bunyan'; -import { getOgmiosTxSubmitProvider, getRunningTxSubmitWorker } from '../services'; +import { getOgmiosObservableCardanoNode, getRunningTxSubmitWorker } from '../services'; export const TX_WORKER_API_URL_DEFAULT = new URL('http://localhost:3001'); export const PARALLEL_MODE_DEFAULT = false; @@ -33,6 +33,11 @@ export const loadAndStartTxWorker = async (args: TxWorkerArgs, logger?: Logger) }, logger ); - const txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, args); + const txSubmitProvider = new NodeTxSubmitProvider({ + cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args), + // TODO: worker should utilize a handle provider + // handleProvider: await getHandleProvider(), + logger + }); return await getRunningTxSubmitWorker(dnsResolver, txSubmitProvider, logger, args); }; diff --git a/packages/cardano-services/src/Program/services/ogmios.ts b/packages/cardano-services/src/Program/services/ogmios.ts index df33609f97b..bcc9e8c0936 100644 --- a/packages/cardano-services/src/Program/services/ogmios.ts +++ b/packages/cardano-services/src/Program/services/ogmios.ts @@ -1,15 +1,9 @@ /* eslint-disable promise/no-nesting */ /* eslint-disable @typescript-eslint/no-explicit-any */ import { DnsResolver } from '../utils'; -import { HandleProvider, SubmitTxArgs } from '@cardano-sdk/core'; import { Logger } from 'ts-log'; import { MissingCardanoNodeOption } from '../errors'; -import { - OgmiosCardanoNode, - OgmiosObservableCardanoNode, - OgmiosTxSubmitProvider, - urlToConnectionConfig -} from '@cardano-sdk/ogmios'; +import { OgmiosCardanoNode, OgmiosObservableCardanoNode, urlToConnectionConfig } from '@cardano-sdk/ogmios'; import { OgmiosOptionDescriptions, OgmiosProgramOptions } from '../options/ogmios'; import { RunnableModule, isConnectionError } from '@cardano-sdk/util'; import { defer, from, of } from 'rxjs'; @@ -31,103 +25,6 @@ const recreateOgmiosCardanoNode = async ( return new OgmiosCardanoNode({ host: record.name, port: record.port }, logger); }; -const recreateOgmiosTxSubmitProvider = async ( - serviceName: string, - ogmiosTxSubmitProvider: OgmiosTxSubmitProvider, - dnsResolver: DnsResolver, - logger: Logger, - handleProvider?: HandleProvider -) => { - const record = await dnsResolver(serviceName!); - logger.info(`DNS resolution for OgmiosTxSubmitProvider, resolved with record: ${JSON.stringify(record)}`); - await ogmiosTxSubmitProvider - .shutdown() - .catch((error_) => logger.warn(`OgmiosTxSubmitProvider failed to shutdown after connection error: ${error_}`)); - return new OgmiosTxSubmitProvider({ host: record.name, port: record.port }, { logger }, handleProvider); -}; -/** - * Creates an extended TxSubmitProvider instance : - * - use passed srv service name in order to resolve the port - * - make dealing with fail-overs (re-resolving the port) opaque - * - use exponential backoff retry internally with default timeout and factor - * - intercept 'initialize' operation and handle connection errors on initialization - * - intercept 'submitTx' operation and handle connection errors runtime - * - all other operations are bind to pool object without modifications - * - * @returns TxSubmitProvider instance - */ -export const ogmiosTxSubmitProviderWithDiscovery = async ( - dnsResolver: DnsResolver, - logger: Logger, - serviceName: string, - handleProvider?: HandleProvider -): Promise => { - const { name, port } = await dnsResolver(serviceName!); - let ogmiosProvider = new OgmiosTxSubmitProvider({ host: name, port }, { logger }, handleProvider); - - const txSubmitProviderProxy = new Proxy({} as OgmiosTxSubmitProvider, { - get(_, prop, receiver) { - if (prop === 'then') return; - if (prop === 'initialize') { - return () => - ogmiosProvider.initialize().catch(async (error) => { - if (isConnectionError(error)) { - ogmiosProvider = await recreateOgmiosTxSubmitProvider( - serviceName, - ogmiosProvider, - dnsResolver, - logger, - handleProvider - ); - return receiver.initialize(); - } - throw error; - }); - } - if (prop === 'submitTx') { - return (submitTxArgs: SubmitTxArgs) => - ogmiosProvider.submitTx(submitTxArgs).catch(async (error) => { - if (isConnectionError(error)) { - ogmiosProvider = await recreateOgmiosTxSubmitProvider( - serviceName, - ogmiosProvider, - dnsResolver, - logger, - handleProvider - ); - await receiver.initialize(); - await receiver.start(); - return await receiver.submitTx(submitTxArgs); - } - throw error; - }); - } - // Bind if it is a function, no intercept operations - if (typeof ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] === 'function') { - const method = ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] as any; - return method.bind(ogmiosProvider); - } - - return ogmiosProvider[prop as keyof OgmiosTxSubmitProvider]; - } - }); - - return Object.setPrototypeOf(txSubmitProviderProxy, RunnableModule.prototype); -}; - -export const getOgmiosTxSubmitProvider = async ( - dnsResolver: DnsResolver, - logger: Logger, - options?: OgmiosProgramOptions, - handleProvider?: HandleProvider -): Promise => { - if (options?.ogmiosSrvServiceName) - return ogmiosTxSubmitProviderWithDiscovery(dnsResolver, logger, options.ogmiosSrvServiceName, handleProvider); - if (options?.ogmiosUrl) - return new OgmiosTxSubmitProvider(urlToConnectionConfig(options?.ogmiosUrl), { logger }, handleProvider); - throw new MissingCardanoNodeOption([OgmiosOptionDescriptions.Url, OgmiosOptionDescriptions.SrvServiceName]); -}; - /** * Creates an extended OgmiosCardanoNode instance : * - use passed srv service name in order to resolve the port diff --git a/packages/cardano-services/src/Program/services/rabbitmq.ts b/packages/cardano-services/src/Program/services/rabbitmq.ts index 285b115b8c5..370e0c9624b 100644 --- a/packages/cardano-services/src/Program/services/rabbitmq.ts +++ b/packages/cardano-services/src/Program/services/rabbitmq.ts @@ -5,8 +5,7 @@ import { CONNECTION_ERROR_EVENT, RabbitMqTxSubmitProvider, TxSubmitWorker } from import { DnsResolver } from '../utils'; import { Logger } from 'ts-log'; import { MissingProgramOption } from '../errors'; -import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; -import { ProviderError, ProviderFailure, SubmitTxArgs } from '@cardano-sdk/core'; +import { ProviderError, ProviderFailure, SubmitTxArgs, TxSubmitProvider } from '@cardano-sdk/core'; import { RabbitMqOptionDescriptions, RabbitMqProgramOptions } from '../options'; import { ServiceNames } from '../programs/types'; import { SrvRecord } from 'dns'; @@ -85,18 +84,13 @@ type WorkerFactory = () => Promise; * Create a worker factory with service discovery * * @param {DnsResolver} dnsResolver used for DNS resolution - * @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider + * @param {TxSubmitProvider} txSubmitProvider tx submit provider * @param {Logger} logger common logger * @param {TxWorkerArgs} args needed for tx worker initialization * @returns {WorkerFactory} WorkerFactory with service discovery, returning a 'TxSubmitWorker' instance */ export const createWorkerFactoryWithDiscovery = - ( - dnsResolver: DnsResolver, - txSubmitProvider: OgmiosTxSubmitProvider, - logger: Logger, - args: TxWorkerArgs - ): WorkerFactory => + (dnsResolver: DnsResolver, txSubmitProvider: TxSubmitProvider, logger: Logger, args: TxWorkerArgs): WorkerFactory => async () => { const record = await dnsResolver(args.rabbitmqSrvServiceName!); return new TxSubmitWorker({ ...args, rabbitmqUrl: srvRecordToRabbitmqURL(record) }, { logger, txSubmitProvider }); @@ -148,14 +142,14 @@ export const startTxSubmitWorkerWithDiscovery = async ( * Create and return a running worker instance with static service config or service discovery * * @param {DnsResolver} dnsResolver used for DNS resolution - * @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider + * @param {TxSubmitProvider} txSubmitProvider tx submit provider * @param {Logger} logger common logger * @param {TxWorkerArgs} args needed for tx worker initialization * @returns {RunningTxSubmitWorker} RunningTxSubmitWorker instance * @throws {MissingProgramOption} error if neither URL nor service name is provided */ export const getRunningTxSubmitWorker = async ( dnsResolver: DnsResolver, - txSubmitProvider: OgmiosTxSubmitProvider, + txSubmitProvider: TxSubmitProvider, logger: Logger, args?: TxWorkerArgs ): Promise => { diff --git a/packages/cardano-services/src/TxSubmit/NodeTxSubmitProvider.ts b/packages/cardano-services/src/TxSubmit/NodeTxSubmitProvider.ts new file mode 100644 index 00000000000..fc0af60347a --- /dev/null +++ b/packages/cardano-services/src/TxSubmit/NodeTxSubmitProvider.ts @@ -0,0 +1,108 @@ +import { EmptyError, firstValueFrom } from 'rxjs'; +import { + GeneralCardanoNodeError, + GeneralCardanoNodeErrorCode, + HandleOwnerChangeError, + HandleProvider, + HealthCheckResponse, + ObservableCardanoNode, + ProviderError, + ProviderFailure, + SubmitTxArgs, + TxSubmissionError, + TxSubmitProvider +} from '@cardano-sdk/core'; +import { Logger } from 'ts-log'; +import { WithLogger } from '@cardano-sdk/util'; + +type ObservableTxSubmitter = Pick; +export type NodeTxSubmitProviderProps = WithLogger & { + handleProvider?: HandleProvider; + cardanoNode: ObservableTxSubmitter; +}; + +const emptyMessage = 'ObservableCardanoNode observable completed without emitting'; +const toProviderError = (error: unknown) => { + if (error instanceof TxSubmissionError) { + throw new ProviderError(ProviderFailure.BadRequest, error); + } else if (error instanceof GeneralCardanoNodeError) { + throw new ProviderError( + error.code === GeneralCardanoNodeErrorCode.ConnectionFailure + ? ProviderFailure.ConnectionFailure + : error.code === GeneralCardanoNodeErrorCode.ServerNotReady + ? ProviderFailure.ServerUnavailable + : ProviderFailure.Unknown, + error + ); + } + if (error instanceof EmptyError) { + throw new ProviderError( + ProviderFailure.ServerUnavailable, + new GeneralCardanoNodeError(GeneralCardanoNodeErrorCode.ServerNotReady, null, emptyMessage) + ); + } + throw new ProviderError(ProviderFailure.Unknown, error); +}; + +/** Submit transactions to an ObservableCardanoNode. Validates handle resolutions against a HandleProvider. */ +export class NodeTxSubmitProvider implements TxSubmitProvider { + #logger: Logger; + #cardanoNode: ObservableTxSubmitter; + #handleProvider?: HandleProvider; + + constructor({ handleProvider, cardanoNode, logger }: NodeTxSubmitProviderProps) { + this.#handleProvider = handleProvider; + this.#cardanoNode = cardanoNode; + this.#logger = logger; + } + + async submitTx({ signedTransaction, context }: SubmitTxArgs): Promise { + await this.#throwIfHandleResolutionConflict(context); + await firstValueFrom(this.#cardanoNode.submitTx(signedTransaction)).catch(toProviderError); + } + + async healthCheck(): Promise { + const [cardanoNodeHealth, handleProviderHealth] = await Promise.all([ + firstValueFrom(this.#cardanoNode.healthCheck$).catch((error): HealthCheckResponse => { + if (error instanceof EmptyError) { + return { ok: false, reason: emptyMessage }; + } + this.#logger.error('Unexpected healtcheck error', error); + return { ok: false, reason: 'Internal error' }; + }), + this.#handleProvider?.healthCheck() + ]); + return { + localNode: cardanoNodeHealth.localNode, + ok: cardanoNodeHealth.ok && (!handleProviderHealth || handleProviderHealth.ok), + reason: cardanoNodeHealth.reason || handleProviderHealth?.reason + }; + } + + async #throwIfHandleResolutionConflict(context: SubmitTxArgs['context']): Promise { + if (context?.handleResolutions && context.handleResolutions.length > 0) { + if (!this.#handleProvider) { + throw new ProviderError( + ProviderFailure.NotImplemented, + undefined, + 'No HandleProvider was set during construction.' + ); + } + + const handleInfoList = await this.#handleProvider.resolveHandles({ + handles: context.handleResolutions.map((hndRes) => hndRes.handle) + }); + + for (const [index, handleInfo] of handleInfoList.entries()) { + if (!handleInfo || handleInfo.cardanoAddress !== context.handleResolutions[index].cardanoAddress) { + const handleOwnerChangeError = new HandleOwnerChangeError( + context.handleResolutions[index].handle, + context.handleResolutions[index].cardanoAddress, + handleInfo ? handleInfo.cardanoAddress : null + ); + throw new ProviderError(ProviderFailure.Conflict, handleOwnerChangeError); + } + } + } + } +} diff --git a/packages/cardano-services/src/TxSubmit/index.ts b/packages/cardano-services/src/TxSubmit/index.ts index c63b757d76a..03d6d63aa89 100644 --- a/packages/cardano-services/src/TxSubmit/index.ts +++ b/packages/cardano-services/src/TxSubmit/index.ts @@ -1,2 +1,3 @@ export * from './rabbitmq'; +export * from './NodeTxSubmitProvider'; export * from './TxSubmitHttpService'; diff --git a/packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts b/packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts index 5be21b35d29..78938e1d687 100644 --- a/packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts +++ b/packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts @@ -1,10 +1,9 @@ /* eslint-disable @typescript-eslint/no-shadow */ import { CONNECTION_ERROR_EVENT, TX_SUBMISSION_QUEUE, serializeError, waitForPending } from './utils'; -import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR } from '@cardano-sdk/core'; +import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR, TxSubmitProvider } from '@cardano-sdk/core'; import { Channel, Connection, Message, connect } from 'amqplib'; import { EventEmitter } from 'events'; import { Logger } from 'ts-log'; -import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; import { bufferToHexString } from '@cardano-sdk/util'; const moduleName = 'TxSubmitWorker'; @@ -30,7 +29,7 @@ export interface TxSubmitWorkerDependencies { logger: Logger; /** The provider to use to submit tx */ - txSubmitProvider: OgmiosTxSubmitProvider; + txSubmitProvider: TxSubmitProvider; } /** @@ -111,11 +110,6 @@ export class TxSubmitWorker extends EventEmitter { * https://amqp-node.github.io/amqplib/channel_api.html#model_events */ async start() { - if (this.#dependencies.txSubmitProvider.state === null) { - this.#dependencies.logger.info(`${moduleName} init: initialize and start tx submission provider`); - await this.#dependencies.txSubmitProvider.initialize(); - await this.#dependencies.txSubmitProvider.start(); - } this.#dependencies.logger.info(`${moduleName} init: checking tx submission provider health status`); const health = await this.#dependencies.txSubmitProvider.healthCheck(); @@ -156,10 +150,6 @@ export class TxSubmitWorker extends EventEmitter { } async shutdown() { - if (this.#dependencies.txSubmitProvider.state === 'running') { - this.#dependencies.logger.info(`${moduleName} shutdown: shutdown tx submission provider`); - await this.#dependencies.txSubmitProvider.shutdown(); - } await this.stop(); } /** Stops the worker. */ diff --git a/packages/cardano-services/test/Program/services/ogmios.test.ts b/packages/cardano-services/test/Program/services/ogmios.test.ts index 25d2480bd63..ea430d01f92 100644 --- a/packages/cardano-services/test/Program/services/ogmios.test.ts +++ b/packages/cardano-services/test/Program/services/ogmios.test.ts @@ -1,6 +1,5 @@ /* eslint-disable sonarjs/no-identical-functions */ /* eslint-disable sonarjs/no-duplicate-string */ -import { Cardano, ProviderError, TxSubmissionError } from '@cardano-sdk/core'; import { Connection } from '@cardano-ogmios/client'; import { DbPools, LedgerTipModel, findLedgerTip } from '../../../src/util/DbSyncProvider'; import { DbSyncEpochPollService, listenPromise, loadGenesisData, serverClosePromise } from '../../../src/util'; @@ -11,14 +10,13 @@ import { TxSubmitHttpService, createDnsResolver, getOgmiosCardanoNode, - getOgmiosTxSubmitProvider, getPool } from '../../../src'; import { InMemoryCache, UNLIMITED_CACHE_TTL } from '../../../src/InMemoryCache'; -import { KoraLabsHandleProvider } from '@cardano-sdk/cardano-services-client'; -import { Ogmios, OgmiosCardanoNode, OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; +import { Ogmios, OgmiosCardanoNode } from '@cardano-sdk/ogmios'; import { Pool } from 'pg'; import { SrvRecord } from 'dns'; +import { TxSubmissionError, TxSubmitProvider } from '@cardano-sdk/core'; import { bufferToHexString } from '@cardano-sdk/util'; import { clearDbPools, servicesWithVersionPath as services } from '../../util'; import { getPort, getRandomPort } from 'get-port-please'; @@ -37,10 +35,11 @@ jest.mock('@cardano-sdk/cardano-services-client', () => ({ })) })); -const handleProvider = new KoraLabsHandleProvider({ - policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), - serverUrl: 'https://localhost:3000' -}); +// TODO: use a mock handle provider +// const handleProvider = new KoraLabsHandleProvider({ +// policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), +// serverUrl: 'https://localhost:3000' +// }); jest.mock('dns', () => ({ promises: { @@ -96,7 +95,7 @@ describe.skip('Service dependency abstractions', () => { let apiUrlBase: string; let ogmiosServer: http.Server; let ogmiosConnection: Connection; - let txSubmitProvider: OgmiosTxSubmitProvider; + let txSubmitProvider: TxSubmitProvider; let ogmiosCardanoNode: OgmiosCardanoNode; let httpServer: HttpServer; let port: number; @@ -119,9 +118,9 @@ describe.skip('Service dependency abstractions', () => { port = await getPort(); apiUrlBase = `http://localhost:${port}${services.txSubmit.versionPath}/${services.txSubmit.name}`; config = { listen: { port } }; - txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); + // txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); httpServer = new HttpServer(config, { logger, runnableDependencies: [], @@ -135,8 +134,8 @@ describe.skip('Service dependency abstractions', () => { await httpServer.shutdown(); }); - it('txSubmitProvider state should be running when http server has started', () => { - expect(txSubmitProvider.state).toEqual('running'); + it.skip('txSubmitProvider state should be running when http server has started', () => { + // expect(txSubmitProvider.state).toEqual('running'); }); it('txSubmitProvider should be instance of a Proxy ', () => { @@ -233,7 +232,7 @@ describe.skip('Service dependency abstractions', () => { let apiUrlBase: string; let ogmiosServer: http.Server; let ogmiosConnection: Connection; - let txSubmitProvider: OgmiosTxSubmitProvider; + let txSubmitProvider: TxSubmitProvider; let ogmiosCardanoNode: OgmiosCardanoNode; let httpServer: HttpServer; let port: number; @@ -257,9 +256,9 @@ describe.skip('Service dependency abstractions', () => { port = await getPort(); apiUrlBase = `http://localhost:${port}${services.txSubmit.versionPath}/${services.txSubmit.name}`; config = { listen: { port } }; - txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosUrl: new URL(ogmiosConnection.address.webSocket) - }); + // txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosUrl: new URL(ogmiosConnection.address.webSocket) + // }); httpServer = new HttpServer(config, { logger, runnableDependencies: [], @@ -285,40 +284,38 @@ describe.skip('Service dependency abstractions', () => { expect(res.data).toEqual(healthCheckResponseMock({ withTip: false })); }); - it('verifies that the submitted transaction addresses can all correctly be resolved', async () => { - const provider = await getOgmiosTxSubmitProvider( - dnsResolver, - logger, - { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }, - handleProvider - ); - - await provider.initialize(); - await provider.start(); - const res = await provider.submitTx({ - context: { handleResolutions: [handleProviderMocks.getAliceHandleProviderResponse] }, - signedTransaction: bufferToHexString(Buffer.from(new Uint8Array([]))) - }); - expect(res).toBeUndefined(); - await provider.shutdown(); + it.skip('verifies that the submitted transaction addresses can all correctly be resolved', async () => { + // const provider = await getOgmiosTxSubmitProvider( + // dnsResolver, + // logger, + // { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }, + // handleProvider + // ); + // await provider.initialize(); + // await provider.start(); + // const res = await provider.submitTx({ + // context: { handleResolutions: [handleProviderMocks.getAliceHandleProviderResponse] }, + // signedTransaction: bufferToHexString(Buffer.from(new Uint8Array([]))) + // }); + // expect(res).toBeUndefined(); + // await provider.shutdown(); }); - it('throws a provider error if the submitted transaction does not contain addresses that can be resolved from the included context', async () => { - const provider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); - await provider.initialize(); - await provider.start(); - - await expect( - provider.submitTx({ - context: { handleResolutions: [handleProviderMocks.getWrongHandleProviderResponse] }, - signedTransaction: bufferToHexString(Buffer.from(new Uint8Array([]))) - }) - ).rejects.toBeInstanceOf(ProviderError); - await provider.shutdown(); + it.skip('throws a provider error if the submitted transaction does not contain addresses that can be resolved from the included context', async () => { + // const provider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); + // await provider.initialize(); + // await provider.start(); + // await expect( + // provider.submitTx({ + // context: { handleResolutions: [handleProviderMocks.getWrongHandleProviderResponse] }, + // signedTransaction: bufferToHexString(Buffer.from(new Uint8Array([]))) + // }) + // ).rejects.toBeInstanceOf(ProviderError); + // await provider.shutdown(); }); }); @@ -383,7 +380,7 @@ describe.skip('Service dependency abstractions', () => { describe('TxSubmitProvider with service discovery and Ogmios server failover', () => { let mockServer: http.Server; let connection: Connection; - let provider: OgmiosTxSubmitProvider; + let provider: TxSubmitProvider; beforeEach(async () => { connection = Ogmios.createConnectionObject({ port: ogmiosPortDefault }); @@ -402,41 +399,41 @@ describe.skip('Service dependency abstractions', () => { } }); - it('should resolve DNS twice during initialization without reconnection logic with long ws connection type', async () => { + it.skip('should resolve DNS twice during initialization without reconnection logic with long ws connection type', async () => { // Resolves with a failing ogmios port twice, then swap to the default one const dnsResolverMock = await mockDnsResolver(2); - provider = await getOgmiosTxSubmitProvider(dnsResolverMock, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); + // provider = await getOgmiosTxSubmitProvider(dnsResolverMock, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); - await expect(provider.initialize()).resolves.toBeUndefined(); + // await expect(provider.initialize()).resolves.toBeUndefined(); expect(dnsResolverMock).toBeCalledTimes(3); - await provider.start(); - await provider.shutdown(); + // await provider.start(); + // await provider.shutdown(); }); - it('should initially fail with a connection error, then re-resolve the port and propagate the correct non-connection error to the caller', async () => { + it.skip('should initially fail with a connection error, then re-resolve the port and propagate the correct non-connection error to the caller', async () => { // Resolves with a failing ogmios port twice, then swap to the default one const dnsResolverMock = await mockDnsResolver(2); - provider = await getOgmiosTxSubmitProvider(dnsResolverMock, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); + // provider = await getOgmiosTxSubmitProvider(dnsResolverMock, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); - await provider.initialize(); - await provider.start(); + // await provider.initialize(); + // await provider.start(); await expect( provider.submitTx({ signedTransaction: bufferToHexString(Buffer.from(new Uint8Array([]))) }) ).rejects.toBeInstanceOf(TxSubmissionError); expect(dnsResolverMock).toBeCalledTimes(3); - await provider.shutdown(); + // await provider.shutdown(); }); - it('should execute a provider operation without to intercept it', async () => { - provider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); + it.skip('should execute a provider operation without to intercept it', async () => { + // provider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); await expect(provider.healthCheck()).resolves.toEqual(healthCheckResponseMock({ withTip: false })); }); diff --git a/packages/cardano-services/test/Program/services/rabbitmq.test.ts b/packages/cardano-services/test/Program/services/rabbitmq.test.ts index e55b6cbc24a..f20d32d6ace 100644 --- a/packages/cardano-services/test/Program/services/rabbitmq.test.ts +++ b/packages/cardano-services/test/Program/services/rabbitmq.test.ts @@ -10,14 +10,10 @@ import { RunningTxSubmitWorker, TxSubmitHttpService, createDnsResolver, - getOgmiosTxSubmitProvider, getRabbitMqTxSubmitProvider, - getRunningTxSubmitWorker, - loadAndStartTxWorker, startTxSubmitWorkerWithDiscovery } from '../../../src'; -import { Ogmios, OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; -import { RabbitMQContainer } from '../../TxSubmit/rabbitmq/docker'; +import { Ogmios } from '@cardano-sdk/ogmios'; import { SrvRecord } from 'dns'; import { TxSubmissionError, TxSubmitProvider } from '@cardano-sdk/core'; import { URL } from 'url'; @@ -35,7 +31,7 @@ import http from 'http'; const flushPromises = () => new Promise((resolve) => setImmediate(resolve)); let rabbitmqPort: number; -let rabbitmqUrl: URL; +// let rabbitmqUrl: URL; jest.mock('dns', () => ({ promises: { @@ -56,11 +52,11 @@ describe.skip('Program/services/rabbitmq', () => { let apiUrl: URL; let config: HttpServerConfig; const APPLICATION_JSON = 'application/json'; - const container = new RabbitMQContainer(); + // const container = new RabbitMQContainer(); const dnsResolver = createDnsResolver({ factor: 1.1, maxRetryTime: 1000 }, logger); beforeAll(async () => { - ({ rabbitmqPort, rabbitmqUrl } = await container.load()); + // ({ rabbitmqPort, rabbitmqUrl } = await container.load()); apiUrl = new URL(`http://localhost:${await getRandomPort()}${services.txSubmit.versionPath}`); config = { listen: { port: Number.parseInt(apiUrl.port) } }; }); @@ -108,7 +104,7 @@ describe.skip('Program/services/rabbitmq', () => { describe('Established connection', () => { beforeAll(async () => { config = { listen: { port: Number.parseInt(apiUrl.port) } }; - txSubmitProvider = await getRabbitMqTxSubmitProvider(dnsResolver, logger, { rabbitmqUrl }); + // txSubmitProvider = await getRabbitMqTxSubmitProvider(dnsResolver, logger, { rabbitmqUrl }); httpServer = new HttpServer(config, { logger, runnableDependencies: [], @@ -153,16 +149,16 @@ describe.skip('Program/services/rabbitmq', () => { await listenPromise(mockServer, connection); // await ogmiosServerReady(connection); - txSubmitWorker = await loadAndStartTxWorker( - { - apiUrl, - loggerMinSeverity: 'error', - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME, - parallel: true, - rabbitmqSrvServiceName: process.env.RABBITMQ_SRV_SERVICE_NAME - }, - logger - ); + // txSubmitWorker = await loadAndStartTxWorker( + // { + // apiUrl, + // loggerMinSeverity: 'error', + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME, + // parallel: true, + // rabbitmqSrvServiceName: process.env.RABBITMQ_SRV_SERVICE_NAME + // }, + // logger + // ); }); afterEach(async () => { @@ -200,21 +196,21 @@ describe.skip('Program/services/rabbitmq', () => { }); describe('tx-worker', () => { - let apiUrl: URL; + // let apiUrl: URL; beforeAll(async () => { - const container = new RabbitMQContainer(); - ({ rabbitmqPort, rabbitmqUrl } = await container.load()); - apiUrl = new URL(`http://localhost:${await getRandomPort()}`); + // const container = new RabbitMQContainer(); + // ({ rabbitmqPort, rabbitmqUrl } = await container.load()); + // apiUrl = new URL(`http://localhost:${await getRandomPort()}`); }); describe('getRunningTxSubmitWorker', () => { let mockServer: http.Server; let connection: Connection; let txSubmitWorker: RunningTxSubmitWorker; - let txSubmitProvider: OgmiosTxSubmitProvider; + // let txSubmitProvider: TxSubmitProvider; const ogmiosPortDefault = 1337; - const dnsResolver = createDnsResolver({ factor: 1.1, maxRetryTime: 1000 }, logger); + // const dnsResolver = createDnsResolver({ factor: 1.1, maxRetryTime: 1000 }, logger); beforeEach(async () => { connection = Ogmios.createConnectionObject({ port: ogmiosPortDefault }); @@ -236,14 +232,14 @@ describe.skip('Program/services/rabbitmq', () => { it('should instantiate a running worker without service discovery', async () => { const dnsResolverMock = jest.fn(); - txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosUrl: new URL(connection.address.webSocket) - }); + // txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosUrl: new URL(connection.address.webSocket) + // }); - txSubmitWorker = await getRunningTxSubmitWorker(dnsResolverMock, txSubmitProvider, logger, { - apiUrl, - rabbitmqUrl - }); + // txSubmitWorker = await getRunningTxSubmitWorker(dnsResolverMock, txSubmitProvider, logger, { + // apiUrl, + // rabbitmqUrl + // }); expect(dnsResolverMock).toBeCalledTimes(0); expect(txSubmitWorker.getStatus()).toEqual('connected'); @@ -252,14 +248,14 @@ describe.skip('Program/services/rabbitmq', () => { it('should instantiate a running worker with service discovery', async () => { const srvRecord = { name: 'localhost', port: rabbitmqPort, priority: 1, weight: 1 }; const dnsResolverMock = jest.fn().mockResolvedValueOnce(srvRecord); - txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { - ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME - }); + // txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, { + // ogmiosSrvServiceName: process.env.OGMIOS_SRV_SERVICE_NAME + // }); - txSubmitWorker = await getRunningTxSubmitWorker(dnsResolverMock, txSubmitProvider, logger, { - apiUrl, - rabbitmqSrvServiceName: process.env.RABBITMQ_SRV_SERVICE_NAME - }); + // txSubmitWorker = await getRunningTxSubmitWorker(dnsResolverMock, txSubmitProvider, logger, { + // apiUrl, + // rabbitmqSrvServiceName: process.env.RABBITMQ_SRV_SERVICE_NAME + // }); expect(dnsResolverMock).toBeCalledTimes(1); expect(txSubmitWorker.getStatus()).toEqual('connected'); diff --git a/packages/cardano-services/test/TxSubmit/NodeTxSubmitProvider.test.ts b/packages/cardano-services/test/TxSubmit/NodeTxSubmitProvider.test.ts new file mode 100644 index 00000000000..b0632987130 --- /dev/null +++ b/packages/cardano-services/test/TxSubmit/NodeTxSubmitProvider.test.ts @@ -0,0 +1,193 @@ +import { + Cardano, + HandleProvider, + HealthCheckResponse, + ProviderError, + ProviderFailure, + TxSubmissionError, + TxSubmissionErrorCode +} from '@cardano-sdk/core'; +import { EMPTY, ReplaySubject, of, throwError } from 'rxjs'; +import { HexBlob } from '@cardano-sdk/util'; +import { NodeTxSubmitProvider, NodeTxSubmitProviderProps } from '../../src'; +import { generateRandomHexString } from '@cardano-sdk/util-dev'; +import { dummyLogger as logger } from 'ts-log'; + +const mockHandleResolution = { + cardanoAddress: Cardano.PaymentAddress( + 'addr_test1qqk4sr4f7vtqzd2w90d5nfu3n59jhhpawyphnek2y7er02nkrezryq3ydtmkg0e7e2jvzg443h0ffzfwd09wpcxy2fuqmcnecd' + ), + handle: 'alice', + hasDatum: false, + policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), + resolvedAt: { + hash: Cardano.BlockId('10d64cc11e9b20e15b6c46aa7b1fed11246f437e62225655a30ea47bf8cc22d0'), + slot: Cardano.Slot(37_834_496) + } +}; + +const emptyUintArrayAsHexString = HexBlob(''); + +describe('NodeTxSubmitProvider', () => { + let cardanoNode: { + healthCheck$: ReplaySubject; + submitTx: jest.MockedFunction; + }; + let provider: NodeTxSubmitProvider; + let handleProvider: jest.Mocked; + + // const responseWithServiceState = healthCheckResponseMock({ withTip: false }); + + beforeEach(async () => { + cardanoNode = { healthCheck$: new ReplaySubject(1), submitTx: jest.fn() }; + }); + + afterEach(() => cardanoNode.healthCheck$.complete()); + + const assertSubmissionWithoutContext = async () => { + cardanoNode.submitTx.mockReturnValueOnce(of(Cardano.TransactionId(generateRandomHexString(64)))); + const res = await provider.submitTx({ signedTransaction: emptyUintArrayAsHexString }); + expect(res).toBeUndefined(); + expect(cardanoNode.submitTx).toBeCalledTimes(1); + }; + + describe('without handle provider', () => { + beforeEach(async () => { + provider = new NodeTxSubmitProvider({ cardanoNode, logger }); + }); + + describe('submitTx', () => { + it('successfully submits a transaction without handle context', assertSubmissionWithoutContext); + + it('rejects with a ProviderError when cardanoNode.submitTx errors', async () => { + const error = new TxSubmissionError(TxSubmissionErrorCode.EraMismatch, {}, 'Era mismatch'); + cardanoNode.submitTx.mockReturnValueOnce(throwError(() => error)); + await expect(provider.submitTx({ signedTransaction: emptyUintArrayAsHexString })).rejects.toThrowError( + expect.objectContaining({ + innerError: error, + name: ProviderError.name, + reason: ProviderFailure.BadRequest + }) + ); + }); + + it('rejects with a ProviderError when cardanoNode.submitTx completes without emitting', async () => { + cardanoNode.submitTx.mockReturnValueOnce(EMPTY); + await expect(provider.submitTx({ signedTransaction: emptyUintArrayAsHexString })).rejects.toThrowError( + expect.objectContaining({ + name: ProviderError.name, + reason: ProviderFailure.ServerUnavailable + }) + ); + }); + + it('throws when submitting transaction with handles context', async () => { + await expect( + provider.submitTx({ + context: { + handleResolutions: [mockHandleResolution] + }, + signedTransaction: emptyUintArrayAsHexString + }) + ).rejects.toThrowError( + expect.objectContaining({ + name: ProviderError.name, + reason: ProviderFailure.NotImplemented + }) + ); + expect(cardanoNode.submitTx).not.toBeCalled(); + }); + }); + + describe('healthCheck', () => { + it('resolves with first value emitted from node healthCheck$', async () => { + const response = { ok: true }; + cardanoNode.healthCheck$.next(response); + await expect(provider.healthCheck()).resolves.toEqual(response); + }); + + it('resolves with {ok: false} when healthCheck$ completes without emitting', async () => { + cardanoNode.healthCheck$.complete(); + await expect(provider.healthCheck()).resolves.toMatchObject({ ok: false }); + }); + + it('resolves with {ok: false} when healthCheck$ errors', async () => { + cardanoNode.healthCheck$.error(new Error('any error')); + await expect(provider.healthCheck()).resolves.toMatchObject({ ok: false }); + }); + }); + }); + + describe('with handle provider', () => { + beforeEach(() => { + handleProvider = { getPolicyIds: jest.fn(), healthCheck: jest.fn(), resolveHandles: jest.fn() }; + provider = new NodeTxSubmitProvider({ cardanoNode, handleProvider, logger }); + }); + + describe('initialized provider', () => { + describe('healthCheck', () => { + it('resolves with {ok: false} when cardanoNode.healthCheck$ emits an unhealthy response', async () => { + handleProvider.healthCheck.mockResolvedValueOnce({ ok: true }); + cardanoNode.healthCheck$.next({ ok: false }); + const res = await provider.healthCheck(); + expect(res).toMatchObject({ ok: false }); + }); + + it('resolves with {ok: false} when handleProvider emits an unhealthy response', async () => { + handleProvider.healthCheck.mockResolvedValueOnce({ ok: false }); + cardanoNode.healthCheck$.next({ ok: true }); + const res = await provider.healthCheck(); + expect(res).toMatchObject({ ok: false }); + }); + + it('resolves with {ok: true} when both handleProvider and cardanoNode are healthy', async () => { + handleProvider.healthCheck.mockResolvedValueOnce({ ok: true }); + cardanoNode.healthCheck$.next({ ok: true }); + const res = await provider.healthCheck(); + expect(res).toMatchObject({ ok: true }); + }); + }); + + describe('submitTx', () => { + it('successfully submits a transaction without handle context', assertSubmissionWithoutContext); + + it('successfully submits if handles resolve to same addresses as in context', async () => { + handleProvider.resolveHandles.mockResolvedValueOnce([mockHandleResolution]); + cardanoNode.submitTx.mockReturnValueOnce(of(Cardano.TransactionId(generateRandomHexString(64)))); + const res = await provider.submitTx({ + context: { + handleResolutions: [mockHandleResolution] + }, + signedTransaction: emptyUintArrayAsHexString + }); + expect(res).toBeUndefined(); + expect(handleProvider.resolveHandles).toBeCalledTimes(1); + }); + + it('rejects with an error if handles resolve to different addresses than in context', async () => { + handleProvider.resolveHandles = jest.fn().mockResolvedValue([ + { + ...mockHandleResolution, + cardanoAddress: Cardano.PaymentAddress( + 'addr_test1qq585l3hyxgj3nas2v3xymd23vvartfhceme6gv98aaeg9muzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q2g7k3g' + ) + } + ]); + await expect( + provider.submitTx({ + context: { + handleResolutions: [mockHandleResolution] + }, + signedTransaction: emptyUintArrayAsHexString + }) + ).rejects.toThrowError( + expect.objectContaining({ + name: ProviderError.name, + reason: ProviderFailure.Conflict + }) + ); + }); + }); + }); + }); +}); diff --git a/packages/cardano-services/test/TxSubmit/TxSubmitHttpService.test.ts b/packages/cardano-services/test/TxSubmit/TxSubmitHttpService.test.ts index 713b3d9a3f2..57c07868bf2 100644 --- a/packages/cardano-services/test/TxSubmit/TxSubmitHttpService.test.ts +++ b/packages/cardano-services/test/TxSubmit/TxSubmitHttpService.test.ts @@ -2,7 +2,6 @@ import { APPLICATION_JSON, CONTENT_TYPE, HttpServer, HttpServerConfig, TxSubmitHttpService } from '../../src'; import { CreateHttpProviderConfig, txSubmitHttpProvider } from '@cardano-sdk/cardano-services-client'; import { FATAL, createLogger } from 'bunyan'; -import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; import { ProviderError, TxSubmissionError, TxSubmissionErrorCode, TxSubmitProvider } from '@cardano-sdk/core'; import { bufferToHexString, fromSerializableObject } from '@cardano-sdk/util'; import { getPort } from 'get-port-please'; @@ -13,15 +12,11 @@ import cbor from 'cbor'; const txSubmitProviderMock = ( healthCheckImpl = async () => Promise.resolve({ ok: true }), - submitTxImpl = async () => Promise.resolve([]) -) => - ({ - healthCheck: jest.fn(healthCheckImpl), - initialize: jest.fn(), - shutdown: jest.fn(), - start: jest.fn(), - submitTx: jest.fn(submitTxImpl) - } as unknown as OgmiosTxSubmitProvider); + submitTxImpl: TxSubmitProvider['submitTx'] = async () => Promise.resolve(void 0) +): jest.Mocked => ({ + healthCheck: jest.fn(healthCheckImpl), + submitTx: jest.fn(submitTxImpl) +}); const serializeProviderArg = (arg: unknown) => ({ signedTransaction: arg }); const bodyTx = serializeProviderArg(cbor.encode('#####').toString('hex')); @@ -30,7 +25,7 @@ const APPLICATION_CBOR = 'application/cbor'; const emptyUintArrayAsHexString = bufferToHexString(Buffer.from(new Uint8Array())); describe('TxSubmitHttpService', () => { - let txSubmitProvider: OgmiosTxSubmitProvider; + let txSubmitProvider: jest.Mocked; let httpServer: HttpServer; let port: number; let baseUrl: string; diff --git a/packages/cardano-services/test/TxSubmit/rabbitmq/TxSubmitWorker.test.ts b/packages/cardano-services/test/TxSubmit/rabbitmq/TxSubmitWorker.test.ts index 15906443c8c..adb86621833 100644 --- a/packages/cardano-services/test/TxSubmit/rabbitmq/TxSubmitWorker.test.ts +++ b/packages/cardano-services/test/TxSubmit/rabbitmq/TxSubmitWorker.test.ts @@ -1,35 +1,33 @@ import { BAD_CONNECTION_URL, txsPromise } from './utils'; -import { OgmiosTxSubmitProvider, urlToConnectionConfig } from '@cardano-sdk/ogmios'; -import { ProviderError, TxSubmissionError } from '@cardano-sdk/core'; +import { ProviderError, TxSubmissionError, TxSubmitProvider } from '@cardano-sdk/core'; import { RabbitMQContainer } from './docker'; import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../../../src'; import { TestLogger, createLogger } from '@cardano-sdk/util-dev'; -import { getRandomPort } from 'get-port-please'; import http from 'http'; // TODO: refactor this to not use an underlying OgmiosTxSubmitProvider, -// but a mock provider instead +// but a mock provider instead. +// Also, make sure that underlying tx submit provider that's being used +// when running the cli program is initialized properly describe.skip('TxSubmitWorker', () => { const container = new RabbitMQContainer(); let logger: TestLogger; let mock: http.Server | undefined; - let port: number; let rabbitmqUrl: URL; - let txSubmitProvider: OgmiosTxSubmitProvider; + let txSubmitProvider: TxSubmitProvider; let worker: TxSubmitWorker | undefined; beforeAll(async () => { ({ rabbitmqUrl } = await container.load()); - port = await getRandomPort(); }); beforeEach(async () => { await container.removeQueues(); logger = createLogger({ record: true }); - txSubmitProvider = new OgmiosTxSubmitProvider(urlToConnectionConfig(new URL(`http://localhost:${port}/`)), { - logger - }); + // txSubmitProvider = new TxSubmitProvider(urlToConnectionConfig(new URL(`http://localhost:${port}/`)), { + // logger + // }); }); afterEach(async () => { diff --git a/packages/cardano-services/test/TxSubmit/rabbitmq/rabbitmqTxSubmitProvider.test.ts b/packages/cardano-services/test/TxSubmit/rabbitmq/rabbitmqTxSubmitProvider.test.ts index 288d97f0f5e..2d8c5a7a9e3 100644 --- a/packages/cardano-services/test/TxSubmit/rabbitmq/rabbitmqTxSubmitProvider.test.ts +++ b/packages/cardano-services/test/TxSubmit/rabbitmq/rabbitmqTxSubmitProvider.test.ts @@ -1,6 +1,5 @@ import { BAD_CONNECTION_URL, txsPromise } from './utils'; -import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; -import { ProviderError } from '@cardano-sdk/core'; +import { ProviderError, TxSubmitProvider } from '@cardano-sdk/core'; import { RabbitMQContainer } from './docker'; import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../../../src'; import { logger } from '@cardano-sdk/util-dev'; @@ -70,11 +69,8 @@ describe('RabbitMqTxSubmitProvider', () => { logger, txSubmitProvider: { healthCheck: async () => ({ ok: true }), - initialize: () => Promise.resolve(), - shutdown: () => Promise.resolve(), - start: () => Promise.resolve(), submitTx: () => Promise.resolve() - } as unknown as OgmiosTxSubmitProvider + } as TxSubmitProvider } ); diff --git a/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts b/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts index 9fef2812ad4..a0877fe1611 100644 --- a/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts +++ b/packages/core/src/CardanoNode/types/ObservableCardanoNode.ts @@ -1,5 +1,5 @@ import { bufferChainSyncEvent } from '../util/bufferChainSyncEvent'; -import type { Cardano, HealthCheckResponse } from '../..'; +import type { Cardano, HealthCheckResponse, TxCBOR } from '../..'; import type { EraSummary } from './CardanoNode'; import type { Observable } from 'rxjs'; @@ -71,6 +71,12 @@ export interface ObservableCardanoNode { * @throws CardanoNodeErrors.UnknownCardanoNodeError on any other unexpected/unhandled errors */ findIntersect(points: PointOrOrigin[]): Observable; + + /** + * @param tx serialized transaction + * @returns transaction id + */ + submitTx(tx: TxCBOR): Observable; } export const ObservableCardanoNode = { bufferChainSyncEvent } as const; diff --git a/packages/e2e/src/factories.ts b/packages/e2e/src/factories.ts index bc904dc040e..d042aa4a52f 100644 --- a/packages/e2e/src/factories.ts +++ b/packages/e2e/src/factories.ts @@ -45,11 +45,11 @@ import { } from '@cardano-sdk/cardano-services-client'; import { LedgerKeyAgent } from '@cardano-sdk/hardware-ledger'; import { Logger } from 'ts-log'; -import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios'; +import { NodeTxSubmitProvider } from '@cardano-sdk/cardano-services'; +import { OgmiosObservableCardanoNode } from '@cardano-sdk/ogmios'; import { TrezorKeyAgent } from '@cardano-sdk/hardware-trezor'; -import { createConnectionObject } from '@cardano-ogmios/client'; import { createStubStakePoolProvider } from '@cardano-sdk/util-dev'; -import { filter, firstValueFrom } from 'rxjs'; +import { filter, firstValueFrom, of } from 'rxjs'; import DeviceConnection from '@cardano-foundation/ledgerjs-hw-app-cardano'; import memoize from 'lodash/memoize'; @@ -136,7 +136,7 @@ rewardsProviderFactory.register(HTTP_PROVIDER, async (params: any, logger: Logge }); txSubmitProviderFactory.register(OGMIOS_PROVIDER, async (params: any, logger: Logger): Promise => { - if (params.baseUrl === undefined) throw new Error(`${OgmiosTxSubmitProvider.name}: ${MISSING_URL_PARAM}`); + if (params.baseUrl === undefined) throw new Error(`${NodeTxSubmitProvider.name}: ${MISSING_URL_PARAM}`); const connectionConfig = { host: params.baseUrl.hostname, @@ -145,7 +145,17 @@ txSubmitProviderFactory.register(OGMIOS_PROVIDER, async (params: any, logger: Lo }; return new Promise(async (resolve) => { - resolve(new OgmiosTxSubmitProvider(createConnectionObject(connectionConfig), { logger })); + resolve( + new NodeTxSubmitProvider({ + cardanoNode: new OgmiosObservableCardanoNode( + { + connectionConfig$: of(connectionConfig) + }, + { logger } + ), + logger + }) + ); }); }); diff --git a/packages/e2e/test/projection/offline-fork.test.ts b/packages/e2e/test/projection/offline-fork.test.ts index 4865e90c709..22a0bb86f1a 100644 --- a/packages/e2e/test/projection/offline-fork.test.ts +++ b/packages/e2e/test/projection/offline-fork.test.ts @@ -22,7 +22,7 @@ import { } from '@cardano-sdk/core'; import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev'; import { ConnectionConfig } from '@cardano-ogmios/client'; -import { Observable, filter, firstValueFrom, lastValueFrom, map, of, take, takeWhile, toArray } from 'rxjs'; +import { Observable, filter, firstValueFrom, lastValueFrom, map, of, take, takeWhile, throwError, toArray } from 'rxjs'; import { OgmiosObservableCardanoNode } from '@cardano-sdk/ogmios'; import { ReconnectionConfig } from '@cardano-sdk/util-rxjs'; import { createDatabase } from 'typeorm-extension'; @@ -62,6 +62,7 @@ const createForkProjectionSource = ( // Same network info eraSummaries$: forkFromNode.eraSummaries$, genesisParameters$: forkFromNode.genesisParameters$, + submitTx: () => throwError(() => new Error('Not implemented')), // Stub chain sync that forks from provided tip // eslint-disable-next-line sort-keys-fix/sort-keys-fix findIntersect: (points) => { diff --git a/packages/e2e/test/web-extension/webpack.config.base.js b/packages/e2e/test/web-extension/webpack.config.base.js index b6e4cd449f6..ded44acd9df 100644 --- a/packages/e2e/test/web-extension/webpack.config.base.js +++ b/packages/e2e/test/web-extension/webpack.config.base.js @@ -70,7 +70,9 @@ module.exports = { resolve: { extensions: ['.ts', '.js'], fallback: { + '@cardano-sdk/cardano-services': false, buffer: require.resolve('buffer/'), + crypto: require.resolve('crypto-browserify'), events: require.resolve('events/'), fs: false, 'get-port-please': false, @@ -79,7 +81,6 @@ module.exports = { os: false, path: false, perf_hooks: false, - crypto: require.resolve('crypto-browserify'), stream: require.resolve('readable-stream'), util: require.resolve('util/') } diff --git a/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts b/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts index 3246467aeab..37c64917061 100644 --- a/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts +++ b/packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts @@ -11,11 +11,13 @@ import { ObservableCardanoNode, ObservableChainSync, PointOrOrigin, - StateQueryErrorCode + StateQueryErrorCode, + TxCBOR } from '@cardano-sdk/core'; import { ChainSynchronization, ConnectionConfig, + TransactionSubmission, createConnectionObject, createLedgerStateQueryClient, getServerHealth @@ -31,6 +33,7 @@ import { of, shareReplay, switchMap, + take, throwError, timeout } from 'rxjs'; @@ -39,7 +42,7 @@ import { WithLogger, contextLogger } from '@cardano-sdk/util'; import { createObservableChainSyncClient } from './createObservableChainSyncClient'; import { ogmiosServerHealthToHealthCheckResponse } from '../../util'; import { ogmiosToCorePointOrOrigin, ogmiosToCoreTipOrOrigin, pointOrOriginToOgmios } from './util'; -import { queryEraSummaries, queryGenesisParameters } from '../queries'; +import { queryEraSummaries, queryGenesisParameters, withCoreCardanoNodeError } from '../queries'; import isEqual from 'lodash/isEqual'; const ogmiosToCoreIntersection = (intersection: ChainSynchronization.Intersection) => ({ @@ -48,27 +51,35 @@ const ogmiosToCoreIntersection = (intersection: ChainSynchronization.Intersectio }); export type LocalStateQueryRetryConfig = Pick; +export type SubmitTxRetryConfig = Pick; -const DEFAULT_HEALTH_CHECK_TIMEOUT = 2000; -const DEFAULT_LSQ_RETRY_CONFIG: LocalStateQueryRetryConfig = { +const DEFAULT_HEALTH_CHECK_TIMEOUT = Milliseconds(2000); +const DEFAULT_SUBMIT_MAX_RETRIES = 5; +const DEFAULT_RETRY_CONFIG: LocalStateQueryRetryConfig = { initialInterval: 1000, maxInterval: 30_000 }; -export type OgmiosObservableCardanoNodeProps = Omit & { +export type OgmiosObservableCardanoNodeProps = InteractionContextProps & { /** Default: 2000ms */ healthCheckTimeout?: Milliseconds; /** Default: {initialInterval: 1000, maxInterval: 30_000} */ localStateQueryRetryConfig?: LocalStateQueryRetryConfig; + /** Default: {initialInterval: 1000, maxInterval: 30_000, maxRetries: 5} */ + submitTxQueryRetryConfig?: SubmitTxRetryConfig; }; -const retryableStateQueryErrors = new Set([ +const retryableCardanoNodeErrors = new Set([ GeneralCardanoNodeErrorCode.ServerNotReady, - StateQueryErrorCode.UnavailableInCurrentEra, GeneralCardanoNodeErrorCode.ConnectionFailure ]); +const retryableStateQueryErrors = new Set([ + ...retryableCardanoNodeErrors, + StateQueryErrorCode.UnavailableInCurrentEra +]); + const stateQueryRetryBackoffConfig = ( - retryConfig: LocalStateQueryRetryConfig = DEFAULT_LSQ_RETRY_CONFIG, + retryConfig: LocalStateQueryRetryConfig = DEFAULT_RETRY_CONFIG, logger: Logger ): RetryBackoffConfig => ({ ...retryConfig, @@ -81,8 +92,24 @@ const stateQueryRetryBackoffConfig = ( } }); +const submitTxRetryBackoffConfig = ( + retryConfig: SubmitTxRetryConfig = DEFAULT_RETRY_CONFIG, + logger: Logger +): RetryBackoffConfig => ({ + ...retryConfig, + maxRetries: retryConfig.maxRetries || DEFAULT_SUBMIT_MAX_RETRIES, + shouldRetry: (error) => { + if (retryableStateQueryErrors.has(CardanoNodeUtil.asCardanoNodeError(error)?.code)) { + logger.warn('Failed to submitTx, will retry', error); + return true; + } + return false; + } +}); + export class OgmiosObservableCardanoNode implements ObservableCardanoNode { readonly #connectionConfig$: Observable; + readonly #submitTxRetryBackoffConfig: RetryBackoffConfig; readonly #logger: Logger; readonly #interactionContext$; @@ -93,6 +120,7 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode { constructor(props: OgmiosObservableCardanoNodeProps, { logger }: WithLogger) { this.#connectionConfig$ = props.connectionConfig$; this.#logger = contextLogger(logger, 'ObservableOgmiosCardanoNode'); + this.#submitTxRetryBackoffConfig = submitTxRetryBackoffConfig(props.submitTxQueryRetryConfig, logger); this.#interactionContext$ = createObservableInteractionContext( { ...props @@ -177,4 +205,18 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode { ) ); } + + submitTx(tx: TxCBOR): Observable { + return this.#interactionContext$.pipe( + switchMap((context) => + from( + withCoreCardanoNodeError(() => + TransactionSubmission.submitTransaction(context, tx) + ) as Promise + ) + ), + retryBackoff(this.#submitTxRetryBackoffConfig), + take(1) + ); + } } diff --git a/packages/ogmios/src/CardanoNode/queries.ts b/packages/ogmios/src/CardanoNode/queries.ts index 418677ec0e2..b4508e4127c 100644 --- a/packages/ogmios/src/CardanoNode/queries.ts +++ b/packages/ogmios/src/CardanoNode/queries.ts @@ -78,7 +78,6 @@ export const ogmiosToCoreError = (error: any) => { export const withCoreCardanoNodeError = async (operation: () => Promise) => { try { return await operation(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error) { throw ogmiosToCoreError(error); } diff --git a/packages/ogmios/src/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.ts b/packages/ogmios/src/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.ts deleted file mode 100644 index 32e533708cf..00000000000 --- a/packages/ogmios/src/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.ts +++ /dev/null @@ -1,122 +0,0 @@ -/* eslint-disable no-console */ -import { - ConnectionConfig, - TransactionSubmission, - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - TxSubmission -} from '@cardano-ogmios/client'; -import { - GeneralCardanoNodeError, - GeneralCardanoNodeErrorCode, - HandleOwnerChangeError, - HandleProvider, - HealthCheckResponse, - ProviderDependencies, - ProviderError, - ProviderFailure, - SubmitTxArgs, - TxSubmitProvider -} from '@cardano-sdk/core'; -import { Logger } from 'ts-log'; -import { OgmiosCardanoNode } from '../../CardanoNode'; -import { RunnableModule, contextLogger, isNotNil } from '@cardano-sdk/util'; -import { createInteractionContextWithLogger } from '../../util'; -import { withCoreCardanoNodeError } from '../../CardanoNode/queries'; - -/** - * Connect to an [Ogmios](https://ogmios.dev/) instance - * - * @class OgmiosTxSubmitProvider - */ -export class OgmiosTxSubmitProvider extends RunnableModule implements TxSubmitProvider { - #txSubmissionClient: TransactionSubmission.TransactionSubmissionClient; - #logger: Logger; - #connectionConfig: ConnectionConfig; - #handleProvider?: HandleProvider; - - /** - * @param {ConnectionConfig} connectionConfig Ogmios connection configuration - * @param {Logger} logger object implementing the Logger abstract class - * @throws {TxSubmission.errors} - */ - constructor(connectionConfig: ConnectionConfig, { logger }: ProviderDependencies, handleProvider?: HandleProvider) { - super('OgmiosTxSubmitProvider', logger); - this.#logger = contextLogger(logger, 'OgmiosTxSubmitProvider'); - this.#connectionConfig = connectionConfig; - this.#handleProvider = handleProvider; - } - - public async initializeImpl(): Promise { - this.#logger.info('Initializing OgmiosTxSubmitProvider'); - - this.#txSubmissionClient = await TransactionSubmission.createTransactionSubmissionClient( - await createInteractionContextWithLogger(contextLogger(this.#logger, 'ogmiosTxSubmitProvider'), { - connection: this.#connectionConfig - }) - ); - - this.#logger.info('OgmiosTxSubmitProvider initialized'); - } - - public async shutdownImpl(): Promise { - this.#logger.info('Shutting down OgmiosTxSubmitProvider'); - if ( - isNotNil(this.#txSubmissionClient) && - this.#txSubmissionClient.context.socket.readyState !== this.#txSubmissionClient.context.socket.CLOSED - ) { - await this.#txSubmissionClient.shutdown(); - } - } - - async submitTx({ signedTransaction, context }: SubmitTxArgs): Promise { - if (this.state !== 'running') { - throw new GeneralCardanoNodeError( - GeneralCardanoNodeErrorCode.ServerNotReady, - null, - 'OgmiosTxSubmitProvider not started' - ); - } - - await this.throwIfHandleResolutionConflict(context); - return withCoreCardanoNodeError(async () => { - const id = await this.#txSubmissionClient.submitTransaction(signedTransaction); - this.#logger.info(`Submitted ${id}`); - }); - } - - async healthCheck(): Promise { - return OgmiosCardanoNode.healthCheck(this.#connectionConfig, this.logger); - } - - async startImpl(): Promise { - return Promise.resolve(); - } - - private async throwIfHandleResolutionConflict(context: SubmitTxArgs['context']): Promise { - if (context?.handleResolutions && context.handleResolutions.length > 0) { - if (!this.#handleProvider) { - throw new ProviderError( - ProviderFailure.NotImplemented, - undefined, - 'No HandleProvider was set during construction.' - ); - } - - const handleInfoList = await this.#handleProvider.resolveHandles({ - handles: context.handleResolutions.map((hndRes) => hndRes.handle) - }); - - for (const [index, handleInfo] of handleInfoList.entries()) { - if (!handleInfo || handleInfo.cardanoAddress !== context.handleResolutions[index].cardanoAddress) { - const handleOwnerChangeError = new HandleOwnerChangeError( - context.handleResolutions[index].handle, - context.handleResolutions[index].cardanoAddress, - handleInfo ? handleInfo.cardanoAddress : null - ); - throw new ProviderError(ProviderFailure.Conflict, handleOwnerChangeError); - } - } - } - } -} diff --git a/packages/ogmios/src/Provider/TxSubmitProvider/index.ts b/packages/ogmios/src/Provider/TxSubmitProvider/index.ts deleted file mode 100644 index f3e34f0f57b..00000000000 --- a/packages/ogmios/src/Provider/TxSubmitProvider/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './OgmiosTxSubmitProvider'; diff --git a/packages/ogmios/src/Provider/index.ts b/packages/ogmios/src/Provider/index.ts deleted file mode 100644 index a3b794c05f5..00000000000 --- a/packages/ogmios/src/Provider/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './TxSubmitProvider'; diff --git a/packages/ogmios/src/index.ts b/packages/ogmios/src/index.ts index 53db022950f..5733162db76 100644 --- a/packages/ogmios/src/index.ts +++ b/packages/ogmios/src/index.ts @@ -1,4 +1,3 @@ -export * from './Provider'; export * from './CardanoNode'; export { urlToConnectionConfig } from './util'; export * as ogmiosToCore from './ogmiosToCore'; diff --git a/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts b/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts index 5a7c59021f1..14154458769 100644 --- a/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts +++ b/packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts @@ -5,7 +5,10 @@ import { GeneralCardanoNodeErrorCode, Milliseconds, StateQueryError, - StateQueryErrorCode + StateQueryErrorCode, + TxCBOR, + TxSubmissionError, + TxSubmissionErrorCode } from '@cardano-sdk/core'; import { Connection, InteractionContext, Mirror, createConnectionObject, safeJSON } from '@cardano-ogmios/client'; import { HEALTH_RESPONSE_BODY } from '../mocks/util'; @@ -16,11 +19,12 @@ import { MockedChainSynchronization, MockedLedgerStateQueryClient, MockedSocket, + MockedTransactionSubmission, ogmiosEraSummaries } from './util'; -import { NextBlockResponse, RollForward } from '@cardano-ogmios/schema'; +import { NextBlockResponse, RollForward, SubmitTransactionFailureEraMismatch } from '@cardano-ogmios/schema'; import { OgmiosObservableCardanoNode } from '../../src'; -import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of } from 'rxjs'; +import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of, toArray } from 'rxjs'; import { generateRandomHexString, logger } from '@cardano-sdk/util-dev'; import { mockGenesisShelley, mockShelleyBlock } from '../ogmiosToCore/testData'; import delay from 'delay'; @@ -30,6 +34,9 @@ jest.mock('@cardano-ogmios/client', () => { return { ...original, ChainSynchronization: {}, + TransactionSubmission: { + submitTransaction: jest.fn() + }, createInteractionContext: jest.fn(), createLedgerStateQueryClient: jest.fn(), getServerHealth: jest.fn() @@ -42,6 +49,7 @@ describe('ObservableOgmiosCardanoNode', () => { let getServerHealth: MockGetServerHealth; let socket: MockedSocket; let chainSynchronization: MockedChainSynchronization; + let TransactionSubmission: MockedTransactionSubmission; let createInteractionContext: MockCreateInteractionContext; const tip = { @@ -56,7 +64,8 @@ describe('ObservableOgmiosCardanoNode', () => { createInteractionContext, createLedgerStateQueryClient, getServerHealth, - ChainSynchronization: chainSynchronization + ChainSynchronization: chainSynchronization, + TransactionSubmission } = require('@cardano-ogmios/client')); ledgerStateQueryClient = { eraSummaries: jest.fn() as MockedLedgerStateQueryClient['eraSummaries'], @@ -104,6 +113,7 @@ describe('ObservableOgmiosCardanoNode', () => { createInteractionContext.mockReset(); createLedgerStateQueryClient.mockReset(); getServerHealth.mockReset(); + TransactionSubmission.submitTransaction.mockReset(); }); describe('LSQs on QueryUnavailableInCurrentEra', () => { @@ -155,7 +165,6 @@ describe('ObservableOgmiosCardanoNode', () => { }); }); - // TODO: this passes when run individually but not when running entire test suite it('opaquely reconnects when connection is refused', async () => { createInteractionContext.mockRejectedValueOnce({ name: 'WebSocketClosed' }); const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger }); @@ -222,4 +231,66 @@ describe('ObservableOgmiosCardanoNode', () => { expect(result.ok).toBe(false); }); }); + + describe('submitTx', () => { + let node: OgmiosObservableCardanoNode; + const submitTxMaxRetries = 2; + + beforeEach(() => { + node = new OgmiosObservableCardanoNode( + { + connectionConfig$: of(connection), + submitTxQueryRetryConfig: { initialInterval: 1, maxRetries: submitTxMaxRetries } + }, + { logger } + ); + }); + + describe('successful submission', () => { + it('emits transaction id and completes', async () => { + TransactionSubmission.submitTransaction.mockResolvedValueOnce('id'); + await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR).pipe(toArray()))).resolves.toEqual(['id']); + expect(TransactionSubmission.submitTransaction).toBeCalledTimes(1); + }); + }); + + describe('submission error', () => { + it('maps error to core type', async () => { + TransactionSubmission.submitTransaction.mockRejectedValueOnce({ + code: 3005, + data: { ledgerEra: 'shelley', queryEra: 'alonzo' }, + message: 'Era mismatch' + } as SubmitTransactionFailureEraMismatch); + await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).rejects.toThrowError( + expect.objectContaining({ + code: TxSubmissionErrorCode.EraMismatch, + name: TxSubmissionError.name + }) + ); + expect(TransactionSubmission.submitTransaction).toBeCalledTimes(1); + }); + }); + + describe('connection error', () => { + it('attempts to resubmit opaquely', async () => { + TransactionSubmission.submitTransaction + .mockRejectedValueOnce({ code: 'ECONNREFUSED' }) + .mockResolvedValueOnce('id'); + await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).resolves.toBe('id'); + expect(TransactionSubmission.submitTransaction).toBeCalledTimes(2); + }); + + it('rejects after maxRetries attempts to submit', async () => { + const error = { code: 'ECONNREFUSED' }; + TransactionSubmission.submitTransaction.mockRejectedValue(error); + + await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).rejects.toThrowError( + expect.objectContaining({ + code: GeneralCardanoNodeErrorCode.ConnectionFailure + }) + ); + expect(TransactionSubmission.submitTransaction).toBeCalledTimes(submitTxMaxRetries + 1); + }); + }); + }); }); diff --git a/packages/ogmios/test/CardanoNode/util.ts b/packages/ogmios/test/CardanoNode/util.ts index bc02b3d4403..413c67d599e 100644 --- a/packages/ogmios/test/CardanoNode/util.ts +++ b/packages/ogmios/test/CardanoNode/util.ts @@ -1,6 +1,7 @@ import { ChainSynchronization, InteractionContext, + TransactionSubmission, createInteractionContext, createLedgerStateQueryClient, getServerHealth @@ -13,6 +14,7 @@ export type MockedLedgerStateQueryClient = jest.Mocked; export type MockCreateLedgerStateQuery = jest.MockedFunction; export type MockGetServerHealth = jest.MockedFunction; export type MockedChainSynchronization = jest.Mocked; +export type MockedTransactionSubmission = jest.Mocked; export type MockCreateInteractionContext = jest.MockedFunction; export const ogmiosEraSummaries: EraSummary[] = [ diff --git a/packages/ogmios/test/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.test.ts b/packages/ogmios/test/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.test.ts deleted file mode 100644 index a861eebe5d5..00000000000 --- a/packages/ogmios/test/Provider/TxSubmitProvider/OgmiosTxSubmitProvider.test.ts +++ /dev/null @@ -1,238 +0,0 @@ -import { Cardano, GeneralCardanoNodeError, ProviderError, TxSubmissionError } from '@cardano-sdk/core'; -import { Connection, createConnectionObject } from '@cardano-ogmios/client'; -import { KoraLabsHandleProvider } from '@cardano-sdk/cardano-services-client'; -import { OgmiosTxSubmitProvider } from '../../../src'; -import { bufferToHexString } from '@cardano-sdk/util'; -import { getRandomPort } from 'get-port-please'; -import { healthCheckResponseMock } from '../../../../core/test/CardanoNode/mocks'; -import { dummyLogger as logger } from 'ts-log'; -import http from 'http'; - -const mockHandleResolution = { - cardanoAddress: Cardano.PaymentAddress( - 'addr_test1qqk4sr4f7vtqzd2w90d5nfu3n59jhhpawyphnek2y7er02nkrezryq3ydtmkg0e7e2jvzg443h0ffzfwd09wpcxy2fuqmcnecd' - ), - handle: 'alice', - hasDatum: false, - policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), - resolvedAt: { - hash: Cardano.BlockId('10d64cc11e9b20e15b6c46aa7b1fed11246f437e62225655a30ea47bf8cc22d0'), - slot: Cardano.Slot(37_834_496) - } -}; - -jest.mock('@cardano-sdk/cardano-services-client', () => ({ - ...jest.requireActual('@cardano-sdk/cardano-services-client'), - KoraLabsHandleProvider: jest.fn().mockImplementation(() => ({ - healthCheck: jest.fn(), - resolveHandles: jest.fn().mockResolvedValue([ - { - ...mockHandleResolution, - - cardanoAddress: Cardano.PaymentAddress( - 'addr_test1qqk4sr4f7vtqzd2w90d5nfu3n59jhhpawyphnek2y7er02nkrezryq3ydtmkg0e7e2jvzg443h0ffzfwd09wpcxy2fuqmcnecd' - ) - } - ]) - })) -})); - -const handleProvider = new KoraLabsHandleProvider({ - policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), - serverUrl: 'https://localhost:3000' -}); - -const emptyUintArrayAsHexString = bufferToHexString(Buffer.from(new Uint8Array())); - -// TODO: use mock TransactionSubmissionClient instead of running ogmios server. -// This will require a refactor to replace 'connection' prop with 'client' -describe.skip('OgmiosTxSubmitProvider', () => { - let mockServer: http.Server; - let connection: Connection; - let provider: OgmiosTxSubmitProvider; - - const responseWithServiceState = healthCheckResponseMock({ withTip: false }); - - beforeAll(async () => { - connection = createConnectionObject({ port: await getRandomPort() }); - }); - - describe('healthCheck', () => { - afterEach(async () => { - if (mockServer !== undefined) { - // await serverClosePromise(mockServer); - } - }); - - it('is not ok if cannot connect', async () => { - provider = new OgmiosTxSubmitProvider(connection, { logger }); - const res = await provider.healthCheck(); - expect(res).toEqual({ ok: false }); - }); - - it('is ok if node is close to the network tip', async () => { - // mockServer = createMockOgmiosServer({ - // healthCheck: { response: { networkSynchronization: 0.999, success: true } }, - // submitTx: { response: { success: true } } - // }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - const res = await provider.healthCheck(); - expect(res).toEqual(responseWithServiceState); - }); - - it('is not ok if node is not close to the network tip', async () => { - // mockServer = createMockOgmiosServer({ - // healthCheck: { response: { networkSynchronization: 0.8, success: true } }, - // submitTx: { response: { success: true } } - // }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - const res = await provider.healthCheck(); - expect(res).toEqual({ - ...responseWithServiceState, - localNode: { ...responseWithServiceState.localNode, networkSync: 0.8 }, - ok: false - }); - }); - - it('returns not ok if the Ogmios server throws an error', async () => { - // mockServer = createMockOgmiosServer({ - // healthCheck: { response: { failWith: new Error('Some error'), success: false } }, - // submitTx: { response: { success: true } } - // }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - const health = await provider.healthCheck(); - expect(health.ok).toBe(false); - }); - }); - - describe('submitTx', () => { - afterEach(async () => { - await provider.shutdown(); - // await serverClosePromise(mockServer); - }); - it('resolves if successful', async () => { - // mockServer = createMockOgmiosServer({ submitTx: { response: { success: true } } }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - await provider.initialize(); - await provider.start(); - - const res = await provider.submitTx({ signedTransaction: emptyUintArrayAsHexString }); - expect(res).toBeUndefined(); - }); - - it('rejects with errors thrown by the service', async () => { - // mockServer = createMockOgmiosServer({ - // submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } } - // }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - await provider.initialize(); - await provider.start(); - - await expect(provider.submitTx({ signedTransaction: emptyUintArrayAsHexString })).rejects.toThrowError( - TxSubmissionError - ); - }); - - it('throws an error if context has handles, and no handleProvider is passed', async () => { - // mockServer = createMockOgmiosServer({ - // submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } } - // }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }); - await provider.initialize(); - await provider.start(); - - await expect( - provider.submitTx({ - context: { - handleResolutions: [mockHandleResolution] - }, - signedTransaction: emptyUintArrayAsHexString - }) - ).rejects.toThrowError(/not_implemented/i); - }); - - it('does not throw an error if handles resolve to same addresses as in context', async () => { - // mockServer = createMockOgmiosServer({ submitTx: { response: { success: true } } }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider( - connection, - { logger }, - new KoraLabsHandleProvider({ - policyId: Cardano.PolicyId('50fdcdbfa3154db86a87e4b5697ae30d272e0bbcfa8122efd3e301cb'), - serverUrl: 'https://localhost:3000' - }) - ); - await provider.initialize(); - await provider.start(); - - const res = await provider.submitTx({ - context: { - handleResolutions: [mockHandleResolution] - }, - signedTransaction: emptyUintArrayAsHexString - }); - expect(res).toBeUndefined(); - }); - - it('throws an error if handles resolve to different addresses than in context', async () => { - // mockServer = createMockOgmiosServer({ submitTx: { response: { success: true } } }); - // await listenPromise(mockServer, connection.port); - provider = new OgmiosTxSubmitProvider(connection, { logger }, handleProvider); - await provider.initialize(); - await provider.start(); - - handleProvider.resolveHandles = jest.fn().mockResolvedValue([ - { - ...mockHandleResolution, - - cardanoAddress: Cardano.PaymentAddress( - 'addr_test1qq585l3hyxgj3nas2v3xymd23vvartfhceme6gv98aaeg9muzcjqw982pcftgx53fu5527z2cj2tkx2h8ux2vxsg475q2g7k3g' - ) - } - ]); - - await expect( - provider.submitTx({ - context: { - handleResolutions: [mockHandleResolution] - }, - signedTransaction: emptyUintArrayAsHexString - }) - ).rejects.toThrowError(ProviderError); - }); - }); - - describe('shutdown', () => { - beforeAll(async () => { - // mockServer = createMockOgmiosServer({ stateQuery: { systemStart: { response: { success: true } } } }); - // await listenPromise(mockServer, connection.port); - }); - - afterAll(async () => { - // await serverClosePromise(mockServer); - }); - - beforeEach(async () => { - provider = new OgmiosTxSubmitProvider(connection, { logger }); - await provider.initialize(); - await provider.start(); - }); - - it('shuts down successfully', async () => { - await expect(provider.shutdown()).resolves.not.toThrow(); - }); - - it('throws when querying after shutting down', async () => { - await provider.shutdown(); - await expect(provider.submitTx({ signedTransaction: emptyUintArrayAsHexString })).rejects.toThrowError( - GeneralCardanoNodeError - ); - }); - }); -}); diff --git a/packages/ogmios/test/tsconfig.json b/packages/ogmios/test/tsconfig.json index 3bc1e0183a4..e02c7867755 100644 --- a/packages/ogmios/test/tsconfig.json +++ b/packages/ogmios/test/tsconfig.json @@ -3,7 +3,10 @@ "compilerOptions": { "baseUrl": "." }, - "include": ["./**/*.ts"], + "include": [ + "./**/*.ts", + "../../cardano-services/test/TxSubmit/NodeTxSubmitProvider.test.ts" + ], "references": [ { "path": "../../core/src" diff --git a/packages/util-dev/src/chainSync/index.ts b/packages/util-dev/src/chainSync/index.ts index 723723c626e..a46c12aa865 100644 --- a/packages/util-dev/src/chainSync/index.ts +++ b/packages/util-dev/src/chainSync/index.ts @@ -6,12 +6,14 @@ import { ChainSyncEventType, ChainSyncRollBackward, ChainSyncRollForward, + GeneralCardanoNodeError, + GeneralCardanoNodeErrorCode, Intersection, ObservableCardanoNode, Point, PointOrOrigin } from '@cardano-sdk/core'; -import { Observable, of } from 'rxjs'; +import { Observable, of, throwError } from 'rxjs'; import { fromSerializableObject } from '@cardano-sdk/util'; import { genesisToEraSummary } from './genesisToEraSummary'; import memoize from 'lodash/memoize'; @@ -135,7 +137,11 @@ export const chainSyncData = memoize((dataSet: ChainSyncDataSet) => { }); }, genesisParameters$: of(compactGenesis), - healthCheck$: new Observable() + healthCheck$: new Observable(), + submitTx: () => + throwError( + () => new GeneralCardanoNodeError(GeneralCardanoNodeErrorCode.Unknown, null, 'submitTx is not implemented') + ) }; return { allEvents,