Skip to content

fix: resilient ogmios v6 tx submission #971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions packages/cardano-services/src/Program/programs/providerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ import { InMemoryCache, NoCache } from '../../InMemoryCache';
import { KoraLabsHandleProvider } from '@cardano-sdk/cardano-services-client';
import { Logger } from 'ts-log';
import { MissingProgramOption, MissingServiceDependency, RunnableDependencies, UnknownServiceName } from '../errors';
import { NodeTxSubmitProvider, TxSubmitHttpService } from '../../TxSubmit';
import { Observable } from 'rxjs';
import { OgmiosCardanoNode } from '@cardano-sdk/ogmios';
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
import { Pool } from 'pg';
import { ProviderServerArgs, ProviderServerOptionDescriptions, ServiceNames } from './types';
import { SrvRecord } from 'dns';
import { TxSubmitHttpService } from '../../TxSubmit';
import { TypeormAssetProvider } from '../../Asset/TypeormAssetProvider';
import { TypeormStakePoolProvider } from '../../StakePool/TypeormStakePoolProvider/TypeormStakePoolProvider';
import { createDbSyncMetadataService } from '../../Metadata';
import { createLogger } from 'bunyan';
import { getConnectionConfig, getOgmiosTxSubmitProvider, getRabbitMqTxSubmitProvider } from '../services';
import { getConnectionConfig, getOgmiosObservableCardanoNode, getRabbitMqTxSubmitProvider } from '../services';
import { getEntities } from '../../Projection/prepareTypeormProjection';
import { isNotNil } from '@cardano-sdk/util';
import memoize from 'lodash/memoize';
Expand Down Expand Up @@ -306,7 +306,11 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
[ServiceNames.TxSubmit]: async () => {
const txSubmitProvider = args.useQueue
? await getRabbitMqTxSubmitProvider(dnsResolver, logger, args)
: await getOgmiosTxSubmitProvider(dnsResolver, logger, args, await getHandleProvider());
: new NodeTxSubmitProvider({
cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args),
handleProvider: await getHandleProvider(),
logger
});
return new TxSubmitHttpService({ logger, txSubmitProvider });
}
};
Expand Down
11 changes: 8 additions & 3 deletions packages/cardano-services/src/Program/programs/txWorker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { CommonProgramOptions, OgmiosProgramOptions, RabbitMqProgramOptions } from '../options';
import { Logger } from 'ts-log';
import { TxSubmitWorkerConfig } from '../../TxSubmit';
import { NodeTxSubmitProvider, TxSubmitWorkerConfig } from '../../TxSubmit';
import { createDnsResolver } from '../utils';
import { createLogger } from 'bunyan';
import { getOgmiosTxSubmitProvider, getRunningTxSubmitWorker } from '../services';
import { getOgmiosObservableCardanoNode, getRunningTxSubmitWorker } from '../services';

export const TX_WORKER_API_URL_DEFAULT = new URL('http://localhost:3001');
export const PARALLEL_MODE_DEFAULT = false;
Expand Down Expand Up @@ -33,6 +33,11 @@ export const loadAndStartTxWorker = async (args: TxWorkerArgs, logger?: Logger)
},
logger
);
const txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, args);
const txSubmitProvider = new NodeTxSubmitProvider({
cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args),
// TODO: worker should utilize a handle provider
// handleProvider: await getHandleProvider(),
logger
});
return await getRunningTxSubmitWorker(dnsResolver, txSubmitProvider, logger, args);
};
105 changes: 1 addition & 104 deletions packages/cardano-services/src/Program/services/ogmios.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
/* eslint-disable promise/no-nesting */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DnsResolver } from '../utils';
import { HandleProvider, SubmitTxArgs } from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { MissingCardanoNodeOption } from '../errors';
import {
OgmiosCardanoNode,
OgmiosObservableCardanoNode,
OgmiosTxSubmitProvider,
urlToConnectionConfig
} from '@cardano-sdk/ogmios';
import { OgmiosCardanoNode, OgmiosObservableCardanoNode, urlToConnectionConfig } from '@cardano-sdk/ogmios';
import { OgmiosOptionDescriptions, OgmiosProgramOptions } from '../options/ogmios';
import { RunnableModule, isConnectionError } from '@cardano-sdk/util';
import { defer, from, of } from 'rxjs';
Expand All @@ -31,103 +25,6 @@ const recreateOgmiosCardanoNode = async (
return new OgmiosCardanoNode({ host: record.name, port: record.port }, logger);
};

const recreateOgmiosTxSubmitProvider = async (
serviceName: string,
ogmiosTxSubmitProvider: OgmiosTxSubmitProvider,
dnsResolver: DnsResolver,
logger: Logger,
handleProvider?: HandleProvider
) => {
const record = await dnsResolver(serviceName!);
logger.info(`DNS resolution for OgmiosTxSubmitProvider, resolved with record: ${JSON.stringify(record)}`);
await ogmiosTxSubmitProvider
.shutdown()
.catch((error_) => logger.warn(`OgmiosTxSubmitProvider failed to shutdown after connection error: ${error_}`));
return new OgmiosTxSubmitProvider({ host: record.name, port: record.port }, { logger }, handleProvider);
};
/**
* Creates an extended TxSubmitProvider instance :
* - use passed srv service name in order to resolve the port
* - make dealing with fail-overs (re-resolving the port) opaque
* - use exponential backoff retry internally with default timeout and factor
* - intercept 'initialize' operation and handle connection errors on initialization
* - intercept 'submitTx' operation and handle connection errors runtime
* - all other operations are bind to pool object without modifications
*
* @returns TxSubmitProvider instance
*/
export const ogmiosTxSubmitProviderWithDiscovery = async (
dnsResolver: DnsResolver,
logger: Logger,
serviceName: string,
handleProvider?: HandleProvider
): Promise<OgmiosTxSubmitProvider> => {
const { name, port } = await dnsResolver(serviceName!);
let ogmiosProvider = new OgmiosTxSubmitProvider({ host: name, port }, { logger }, handleProvider);

const txSubmitProviderProxy = new Proxy<OgmiosTxSubmitProvider>({} as OgmiosTxSubmitProvider, {
get(_, prop, receiver) {
if (prop === 'then') return;
if (prop === 'initialize') {
return () =>
ogmiosProvider.initialize().catch(async (error) => {
if (isConnectionError(error)) {
ogmiosProvider = await recreateOgmiosTxSubmitProvider(
serviceName,
ogmiosProvider,
dnsResolver,
logger,
handleProvider
);
return receiver.initialize();
}
throw error;
});
}
if (prop === 'submitTx') {
return (submitTxArgs: SubmitTxArgs) =>
ogmiosProvider.submitTx(submitTxArgs).catch(async (error) => {
if (isConnectionError(error)) {
ogmiosProvider = await recreateOgmiosTxSubmitProvider(
serviceName,
ogmiosProvider,
dnsResolver,
logger,
handleProvider
);
await receiver.initialize();
await receiver.start();
return await receiver.submitTx(submitTxArgs);
}
throw error;
});
}
// Bind if it is a function, no intercept operations
if (typeof ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] === 'function') {
const method = ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] as any;
return method.bind(ogmiosProvider);
}

return ogmiosProvider[prop as keyof OgmiosTxSubmitProvider];
}
});

return Object.setPrototypeOf(txSubmitProviderProxy, RunnableModule.prototype);
};

export const getOgmiosTxSubmitProvider = async (
dnsResolver: DnsResolver,
logger: Logger,
options?: OgmiosProgramOptions,
handleProvider?: HandleProvider
): Promise<OgmiosTxSubmitProvider> => {
if (options?.ogmiosSrvServiceName)
return ogmiosTxSubmitProviderWithDiscovery(dnsResolver, logger, options.ogmiosSrvServiceName, handleProvider);
if (options?.ogmiosUrl)
return new OgmiosTxSubmitProvider(urlToConnectionConfig(options?.ogmiosUrl), { logger }, handleProvider);
throw new MissingCardanoNodeOption([OgmiosOptionDescriptions.Url, OgmiosOptionDescriptions.SrvServiceName]);
};

/**
* Creates an extended OgmiosCardanoNode instance :
* - use passed srv service name in order to resolve the port
Expand Down
16 changes: 5 additions & 11 deletions packages/cardano-services/src/Program/services/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { CONNECTION_ERROR_EVENT, RabbitMqTxSubmitProvider, TxSubmitWorker } from
import { DnsResolver } from '../utils';
import { Logger } from 'ts-log';
import { MissingProgramOption } from '../errors';
import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios';
import { ProviderError, ProviderFailure, SubmitTxArgs } from '@cardano-sdk/core';
import { ProviderError, ProviderFailure, SubmitTxArgs, TxSubmitProvider } from '@cardano-sdk/core';
import { RabbitMqOptionDescriptions, RabbitMqProgramOptions } from '../options';
import { ServiceNames } from '../programs/types';
import { SrvRecord } from 'dns';
Expand Down Expand Up @@ -85,18 +84,13 @@ type WorkerFactory = () => Promise<TxSubmitWorker>;
* Create a worker factory with service discovery
*
* @param {DnsResolver} dnsResolver used for DNS resolution
* @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider
* @param {TxSubmitProvider} txSubmitProvider tx submit provider
* @param {Logger} logger common logger
* @param {TxWorkerArgs} args needed for tx worker initialization
* @returns {WorkerFactory} WorkerFactory with service discovery, returning a 'TxSubmitWorker' instance
*/
export const createWorkerFactoryWithDiscovery =
(
dnsResolver: DnsResolver,
txSubmitProvider: OgmiosTxSubmitProvider,
logger: Logger,
args: TxWorkerArgs
): WorkerFactory =>
(dnsResolver: DnsResolver, txSubmitProvider: TxSubmitProvider, logger: Logger, args: TxWorkerArgs): WorkerFactory =>
async () => {
const record = await dnsResolver(args.rabbitmqSrvServiceName!);
return new TxSubmitWorker({ ...args, rabbitmqUrl: srvRecordToRabbitmqURL(record) }, { logger, txSubmitProvider });
Expand Down Expand Up @@ -148,14 +142,14 @@ export const startTxSubmitWorkerWithDiscovery = async (
* Create and return a running worker instance with static service config or service discovery
*
* @param {DnsResolver} dnsResolver used for DNS resolution
* @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider
* @param {TxSubmitProvider} txSubmitProvider tx submit provider
* @param {Logger} logger common logger
* @param {TxWorkerArgs} args needed for tx worker initialization * @returns {RunningTxSubmitWorker} RunningTxSubmitWorker instance
* @throws {MissingProgramOption} error if neither URL nor service name is provided
*/
export const getRunningTxSubmitWorker = async (
dnsResolver: DnsResolver,
txSubmitProvider: OgmiosTxSubmitProvider,
txSubmitProvider: TxSubmitProvider,
logger: Logger,
args?: TxWorkerArgs
): Promise<RunningTxSubmitWorker> => {
Expand Down
108 changes: 108 additions & 0 deletions packages/cardano-services/src/TxSubmit/NodeTxSubmitProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { EmptyError, firstValueFrom } from 'rxjs';
import {
GeneralCardanoNodeError,
GeneralCardanoNodeErrorCode,
HandleOwnerChangeError,
HandleProvider,
HealthCheckResponse,
ObservableCardanoNode,
ProviderError,
ProviderFailure,
SubmitTxArgs,
TxSubmissionError,
TxSubmitProvider
} from '@cardano-sdk/core';
import { Logger } from 'ts-log';
import { WithLogger } from '@cardano-sdk/util';

type ObservableTxSubmitter = Pick<ObservableCardanoNode, 'healthCheck$' | 'submitTx'>;
export type NodeTxSubmitProviderProps = WithLogger & {
handleProvider?: HandleProvider;
cardanoNode: ObservableTxSubmitter;
};

const emptyMessage = 'ObservableCardanoNode observable completed without emitting';
const toProviderError = (error: unknown) => {
if (error instanceof TxSubmissionError) {
throw new ProviderError(ProviderFailure.BadRequest, error);
} else if (error instanceof GeneralCardanoNodeError) {
throw new ProviderError(
error.code === GeneralCardanoNodeErrorCode.ConnectionFailure
? ProviderFailure.ConnectionFailure
: error.code === GeneralCardanoNodeErrorCode.ServerNotReady
? ProviderFailure.ServerUnavailable
: ProviderFailure.Unknown,
error
);
}
if (error instanceof EmptyError) {
throw new ProviderError(
ProviderFailure.ServerUnavailable,
new GeneralCardanoNodeError(GeneralCardanoNodeErrorCode.ServerNotReady, null, emptyMessage)
);
}
throw new ProviderError(ProviderFailure.Unknown, error);
};

/** Submit transactions to an ObservableCardanoNode. Validates handle resolutions against a HandleProvider. */
export class NodeTxSubmitProvider implements TxSubmitProvider {
#logger: Logger;
#cardanoNode: ObservableTxSubmitter;
#handleProvider?: HandleProvider;

constructor({ handleProvider, cardanoNode, logger }: NodeTxSubmitProviderProps) {
this.#handleProvider = handleProvider;
this.#cardanoNode = cardanoNode;
this.#logger = logger;
}

async submitTx({ signedTransaction, context }: SubmitTxArgs): Promise<void> {
await this.#throwIfHandleResolutionConflict(context);
await firstValueFrom(this.#cardanoNode.submitTx(signedTransaction)).catch(toProviderError);
}

async healthCheck(): Promise<HealthCheckResponse> {
const [cardanoNodeHealth, handleProviderHealth] = await Promise.all([
firstValueFrom(this.#cardanoNode.healthCheck$).catch((error): HealthCheckResponse => {
if (error instanceof EmptyError) {
return { ok: false, reason: emptyMessage };
}
this.#logger.error('Unexpected healtcheck error', error);
return { ok: false, reason: 'Internal error' };
}),
this.#handleProvider?.healthCheck()
]);
return {
localNode: cardanoNodeHealth.localNode,
ok: cardanoNodeHealth.ok && (!handleProviderHealth || handleProviderHealth.ok),
reason: cardanoNodeHealth.reason || handleProviderHealth?.reason
};
}

async #throwIfHandleResolutionConflict(context: SubmitTxArgs['context']): Promise<void> {
if (context?.handleResolutions && context.handleResolutions.length > 0) {
if (!this.#handleProvider) {
throw new ProviderError(
ProviderFailure.NotImplemented,
undefined,
'No HandleProvider was set during construction.'
);
}

const handleInfoList = await this.#handleProvider.resolveHandles({
handles: context.handleResolutions.map((hndRes) => hndRes.handle)
});

for (const [index, handleInfo] of handleInfoList.entries()) {
if (!handleInfo || handleInfo.cardanoAddress !== context.handleResolutions[index].cardanoAddress) {
const handleOwnerChangeError = new HandleOwnerChangeError(
context.handleResolutions[index].handle,
context.handleResolutions[index].cardanoAddress,
handleInfo ? handleInfo.cardanoAddress : null
);
throw new ProviderError(ProviderFailure.Conflict, handleOwnerChangeError);
}
}
}
}
}
1 change: 1 addition & 0 deletions packages/cardano-services/src/TxSubmit/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './rabbitmq';
export * from './NodeTxSubmitProvider';
export * from './TxSubmitHttpService';
14 changes: 2 additions & 12 deletions packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
/* eslint-disable @typescript-eslint/no-shadow */
import { CONNECTION_ERROR_EVENT, TX_SUBMISSION_QUEUE, serializeError, waitForPending } from './utils';
import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR } from '@cardano-sdk/core';
import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR, TxSubmitProvider } from '@cardano-sdk/core';
import { Channel, Connection, Message, connect } from 'amqplib';
import { EventEmitter } from 'events';
import { Logger } from 'ts-log';
import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios';
import { bufferToHexString } from '@cardano-sdk/util';

const moduleName = 'TxSubmitWorker';
Expand All @@ -30,7 +29,7 @@ export interface TxSubmitWorkerDependencies {
logger: Logger;

/** The provider to use to submit tx */
txSubmitProvider: OgmiosTxSubmitProvider;
txSubmitProvider: TxSubmitProvider;
}

/**
Expand Down Expand Up @@ -111,11 +110,6 @@ export class TxSubmitWorker extends EventEmitter {
* https://amqp-node.github.io/amqplib/channel_api.html#model_events
*/
async start() {
if (this.#dependencies.txSubmitProvider.state === null) {
this.#dependencies.logger.info(`${moduleName} init: initialize and start tx submission provider`);
await this.#dependencies.txSubmitProvider.initialize();
await this.#dependencies.txSubmitProvider.start();
}
this.#dependencies.logger.info(`${moduleName} init: checking tx submission provider health status`);

const health = await this.#dependencies.txSubmitProvider.healthCheck();
Expand Down Expand Up @@ -156,10 +150,6 @@ export class TxSubmitWorker extends EventEmitter {
}

async shutdown() {
if (this.#dependencies.txSubmitProvider.state === 'running') {
this.#dependencies.logger.info(`${moduleName} shutdown: shutdown tx submission provider`);
await this.#dependencies.txSubmitProvider.shutdown();
}
await this.stop();
}
/** Stops the worker. */
Expand Down
Loading