Skip to content

Commit e45b264

Browse files
authored
Merge pull request #971 from input-output-hk/fix/resilient-ogmios-v6-tx-submission
fix: resilient ogmios v6 tx submission
2 parents adf2620 + 8c56c5e commit e45b264

File tree

28 files changed

+608
-656
lines changed

28 files changed

+608
-656
lines changed

packages/cardano-services/src/Program/programs/providerServer.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ import { InMemoryCache, NoCache } from '../../InMemoryCache';
2424
import { KoraLabsHandleProvider } from '@cardano-sdk/cardano-services-client';
2525
import { Logger } from 'ts-log';
2626
import { MissingProgramOption, MissingServiceDependency, RunnableDependencies, UnknownServiceName } from '../errors';
27+
import { NodeTxSubmitProvider, TxSubmitHttpService } from '../../TxSubmit';
2728
import { Observable } from 'rxjs';
2829
import { OgmiosCardanoNode } from '@cardano-sdk/ogmios';
2930
import { PgConnectionConfig } from '@cardano-sdk/projection-typeorm';
3031
import { Pool } from 'pg';
3132
import { ProviderServerArgs, ProviderServerOptionDescriptions, ServiceNames } from './types';
3233
import { SrvRecord } from 'dns';
33-
import { TxSubmitHttpService } from '../../TxSubmit';
3434
import { TypeormAssetProvider } from '../../Asset/TypeormAssetProvider';
3535
import { TypeormStakePoolProvider } from '../../StakePool/TypeormStakePoolProvider/TypeormStakePoolProvider';
3636
import { createDbSyncMetadataService } from '../../Metadata';
3737
import { createLogger } from 'bunyan';
38-
import { getConnectionConfig, getOgmiosTxSubmitProvider, getRabbitMqTxSubmitProvider } from '../services';
38+
import { getConnectionConfig, getOgmiosObservableCardanoNode, getRabbitMqTxSubmitProvider } from '../services';
3939
import { getEntities } from '../../Projection/prepareTypeormProjection';
4040
import { isNotNil } from '@cardano-sdk/util';
4141
import memoize from 'lodash/memoize';
@@ -306,7 +306,11 @@ const serviceMapFactory = (options: ServiceMapFactoryOptions) => {
306306
[ServiceNames.TxSubmit]: async () => {
307307
const txSubmitProvider = args.useQueue
308308
? await getRabbitMqTxSubmitProvider(dnsResolver, logger, args)
309-
: await getOgmiosTxSubmitProvider(dnsResolver, logger, args, await getHandleProvider());
309+
: new NodeTxSubmitProvider({
310+
cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args),
311+
handleProvider: await getHandleProvider(),
312+
logger
313+
});
310314
return new TxSubmitHttpService({ logger, txSubmitProvider });
311315
}
312316
};

packages/cardano-services/src/Program/programs/txWorker.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { CommonProgramOptions, OgmiosProgramOptions, RabbitMqProgramOptions } from '../options';
22
import { Logger } from 'ts-log';
3-
import { TxSubmitWorkerConfig } from '../../TxSubmit';
3+
import { NodeTxSubmitProvider, TxSubmitWorkerConfig } from '../../TxSubmit';
44
import { createDnsResolver } from '../utils';
55
import { createLogger } from 'bunyan';
6-
import { getOgmiosTxSubmitProvider, getRunningTxSubmitWorker } from '../services';
6+
import { getOgmiosObservableCardanoNode, getRunningTxSubmitWorker } from '../services';
77

88
export const TX_WORKER_API_URL_DEFAULT = new URL('http://localhost:3001');
99
export const PARALLEL_MODE_DEFAULT = false;
@@ -33,6 +33,11 @@ export const loadAndStartTxWorker = async (args: TxWorkerArgs, logger?: Logger)
3333
},
3434
logger
3535
);
36-
const txSubmitProvider = await getOgmiosTxSubmitProvider(dnsResolver, logger, args);
36+
const txSubmitProvider = new NodeTxSubmitProvider({
37+
cardanoNode: getOgmiosObservableCardanoNode(dnsResolver, logger, args),
38+
// TODO: worker should utilize a handle provider
39+
// handleProvider: await getHandleProvider(),
40+
logger
41+
});
3742
return await getRunningTxSubmitWorker(dnsResolver, txSubmitProvider, logger, args);
3843
};

packages/cardano-services/src/Program/services/ogmios.ts

+1-104
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
/* eslint-disable promise/no-nesting */
22
/* eslint-disable @typescript-eslint/no-explicit-any */
33
import { DnsResolver } from '../utils';
4-
import { HandleProvider, SubmitTxArgs } from '@cardano-sdk/core';
54
import { Logger } from 'ts-log';
65
import { MissingCardanoNodeOption } from '../errors';
7-
import {
8-
OgmiosCardanoNode,
9-
OgmiosObservableCardanoNode,
10-
OgmiosTxSubmitProvider,
11-
urlToConnectionConfig
12-
} from '@cardano-sdk/ogmios';
6+
import { OgmiosCardanoNode, OgmiosObservableCardanoNode, urlToConnectionConfig } from '@cardano-sdk/ogmios';
137
import { OgmiosOptionDescriptions, OgmiosProgramOptions } from '../options/ogmios';
148
import { RunnableModule, isConnectionError } from '@cardano-sdk/util';
159
import { defer, from, of } from 'rxjs';
@@ -31,103 +25,6 @@ const recreateOgmiosCardanoNode = async (
3125
return new OgmiosCardanoNode({ host: record.name, port: record.port }, logger);
3226
};
3327

34-
const recreateOgmiosTxSubmitProvider = async (
35-
serviceName: string,
36-
ogmiosTxSubmitProvider: OgmiosTxSubmitProvider,
37-
dnsResolver: DnsResolver,
38-
logger: Logger,
39-
handleProvider?: HandleProvider
40-
) => {
41-
const record = await dnsResolver(serviceName!);
42-
logger.info(`DNS resolution for OgmiosTxSubmitProvider, resolved with record: ${JSON.stringify(record)}`);
43-
await ogmiosTxSubmitProvider
44-
.shutdown()
45-
.catch((error_) => logger.warn(`OgmiosTxSubmitProvider failed to shutdown after connection error: ${error_}`));
46-
return new OgmiosTxSubmitProvider({ host: record.name, port: record.port }, { logger }, handleProvider);
47-
};
48-
/**
49-
* Creates an extended TxSubmitProvider instance :
50-
* - use passed srv service name in order to resolve the port
51-
* - make dealing with fail-overs (re-resolving the port) opaque
52-
* - use exponential backoff retry internally with default timeout and factor
53-
* - intercept 'initialize' operation and handle connection errors on initialization
54-
* - intercept 'submitTx' operation and handle connection errors runtime
55-
* - all other operations are bind to pool object without modifications
56-
*
57-
* @returns TxSubmitProvider instance
58-
*/
59-
export const ogmiosTxSubmitProviderWithDiscovery = async (
60-
dnsResolver: DnsResolver,
61-
logger: Logger,
62-
serviceName: string,
63-
handleProvider?: HandleProvider
64-
): Promise<OgmiosTxSubmitProvider> => {
65-
const { name, port } = await dnsResolver(serviceName!);
66-
let ogmiosProvider = new OgmiosTxSubmitProvider({ host: name, port }, { logger }, handleProvider);
67-
68-
const txSubmitProviderProxy = new Proxy<OgmiosTxSubmitProvider>({} as OgmiosTxSubmitProvider, {
69-
get(_, prop, receiver) {
70-
if (prop === 'then') return;
71-
if (prop === 'initialize') {
72-
return () =>
73-
ogmiosProvider.initialize().catch(async (error) => {
74-
if (isConnectionError(error)) {
75-
ogmiosProvider = await recreateOgmiosTxSubmitProvider(
76-
serviceName,
77-
ogmiosProvider,
78-
dnsResolver,
79-
logger,
80-
handleProvider
81-
);
82-
return receiver.initialize();
83-
}
84-
throw error;
85-
});
86-
}
87-
if (prop === 'submitTx') {
88-
return (submitTxArgs: SubmitTxArgs) =>
89-
ogmiosProvider.submitTx(submitTxArgs).catch(async (error) => {
90-
if (isConnectionError(error)) {
91-
ogmiosProvider = await recreateOgmiosTxSubmitProvider(
92-
serviceName,
93-
ogmiosProvider,
94-
dnsResolver,
95-
logger,
96-
handleProvider
97-
);
98-
await receiver.initialize();
99-
await receiver.start();
100-
return await receiver.submitTx(submitTxArgs);
101-
}
102-
throw error;
103-
});
104-
}
105-
// Bind if it is a function, no intercept operations
106-
if (typeof ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] === 'function') {
107-
const method = ogmiosProvider[prop as keyof OgmiosTxSubmitProvider] as any;
108-
return method.bind(ogmiosProvider);
109-
}
110-
111-
return ogmiosProvider[prop as keyof OgmiosTxSubmitProvider];
112-
}
113-
});
114-
115-
return Object.setPrototypeOf(txSubmitProviderProxy, RunnableModule.prototype);
116-
};
117-
118-
export const getOgmiosTxSubmitProvider = async (
119-
dnsResolver: DnsResolver,
120-
logger: Logger,
121-
options?: OgmiosProgramOptions,
122-
handleProvider?: HandleProvider
123-
): Promise<OgmiosTxSubmitProvider> => {
124-
if (options?.ogmiosSrvServiceName)
125-
return ogmiosTxSubmitProviderWithDiscovery(dnsResolver, logger, options.ogmiosSrvServiceName, handleProvider);
126-
if (options?.ogmiosUrl)
127-
return new OgmiosTxSubmitProvider(urlToConnectionConfig(options?.ogmiosUrl), { logger }, handleProvider);
128-
throw new MissingCardanoNodeOption([OgmiosOptionDescriptions.Url, OgmiosOptionDescriptions.SrvServiceName]);
129-
};
130-
13128
/**
13229
* Creates an extended OgmiosCardanoNode instance :
13330
* - use passed srv service name in order to resolve the port

packages/cardano-services/src/Program/services/rabbitmq.ts

+5-11
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ import { CONNECTION_ERROR_EVENT, RabbitMqTxSubmitProvider, TxSubmitWorker } from
55
import { DnsResolver } from '../utils';
66
import { Logger } from 'ts-log';
77
import { MissingProgramOption } from '../errors';
8-
import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios';
9-
import { ProviderError, ProviderFailure, SubmitTxArgs } from '@cardano-sdk/core';
8+
import { ProviderError, ProviderFailure, SubmitTxArgs, TxSubmitProvider } from '@cardano-sdk/core';
109
import { RabbitMqOptionDescriptions, RabbitMqProgramOptions } from '../options';
1110
import { ServiceNames } from '../programs/types';
1211
import { SrvRecord } from 'dns';
@@ -85,18 +84,13 @@ type WorkerFactory = () => Promise<TxSubmitWorker>;
8584
* Create a worker factory with service discovery
8685
*
8786
* @param {DnsResolver} dnsResolver used for DNS resolution
88-
* @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider
87+
* @param {TxSubmitProvider} txSubmitProvider tx submit provider
8988
* @param {Logger} logger common logger
9089
* @param {TxWorkerArgs} args needed for tx worker initialization
9190
* @returns {WorkerFactory} WorkerFactory with service discovery, returning a 'TxSubmitWorker' instance
9291
*/
9392
export const createWorkerFactoryWithDiscovery =
94-
(
95-
dnsResolver: DnsResolver,
96-
txSubmitProvider: OgmiosTxSubmitProvider,
97-
logger: Logger,
98-
args: TxWorkerArgs
99-
): WorkerFactory =>
93+
(dnsResolver: DnsResolver, txSubmitProvider: TxSubmitProvider, logger: Logger, args: TxWorkerArgs): WorkerFactory =>
10094
async () => {
10195
const record = await dnsResolver(args.rabbitmqSrvServiceName!);
10296
return new TxSubmitWorker({ ...args, rabbitmqUrl: srvRecordToRabbitmqURL(record) }, { logger, txSubmitProvider });
@@ -148,14 +142,14 @@ export const startTxSubmitWorkerWithDiscovery = async (
148142
* Create and return a running worker instance with static service config or service discovery
149143
*
150144
* @param {DnsResolver} dnsResolver used for DNS resolution
151-
* @param {OgmiosTxSubmitProvider} txSubmitProvider tx submit provider
145+
* @param {TxSubmitProvider} txSubmitProvider tx submit provider
152146
* @param {Logger} logger common logger
153147
* @param {TxWorkerArgs} args needed for tx worker initialization * @returns {RunningTxSubmitWorker} RunningTxSubmitWorker instance
154148
* @throws {MissingProgramOption} error if neither URL nor service name is provided
155149
*/
156150
export const getRunningTxSubmitWorker = async (
157151
dnsResolver: DnsResolver,
158-
txSubmitProvider: OgmiosTxSubmitProvider,
152+
txSubmitProvider: TxSubmitProvider,
159153
logger: Logger,
160154
args?: TxWorkerArgs
161155
): Promise<RunningTxSubmitWorker> => {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { EmptyError, firstValueFrom } from 'rxjs';
2+
import {
3+
GeneralCardanoNodeError,
4+
GeneralCardanoNodeErrorCode,
5+
HandleOwnerChangeError,
6+
HandleProvider,
7+
HealthCheckResponse,
8+
ObservableCardanoNode,
9+
ProviderError,
10+
ProviderFailure,
11+
SubmitTxArgs,
12+
TxSubmissionError,
13+
TxSubmitProvider
14+
} from '@cardano-sdk/core';
15+
import { Logger } from 'ts-log';
16+
import { WithLogger } from '@cardano-sdk/util';
17+
18+
type ObservableTxSubmitter = Pick<ObservableCardanoNode, 'healthCheck$' | 'submitTx'>;
19+
export type NodeTxSubmitProviderProps = WithLogger & {
20+
handleProvider?: HandleProvider;
21+
cardanoNode: ObservableTxSubmitter;
22+
};
23+
24+
const emptyMessage = 'ObservableCardanoNode observable completed without emitting';
25+
const toProviderError = (error: unknown) => {
26+
if (error instanceof TxSubmissionError) {
27+
throw new ProviderError(ProviderFailure.BadRequest, error);
28+
} else if (error instanceof GeneralCardanoNodeError) {
29+
throw new ProviderError(
30+
error.code === GeneralCardanoNodeErrorCode.ConnectionFailure
31+
? ProviderFailure.ConnectionFailure
32+
: error.code === GeneralCardanoNodeErrorCode.ServerNotReady
33+
? ProviderFailure.ServerUnavailable
34+
: ProviderFailure.Unknown,
35+
error
36+
);
37+
}
38+
if (error instanceof EmptyError) {
39+
throw new ProviderError(
40+
ProviderFailure.ServerUnavailable,
41+
new GeneralCardanoNodeError(GeneralCardanoNodeErrorCode.ServerNotReady, null, emptyMessage)
42+
);
43+
}
44+
throw new ProviderError(ProviderFailure.Unknown, error);
45+
};
46+
47+
/** Submit transactions to an ObservableCardanoNode. Validates handle resolutions against a HandleProvider. */
48+
export class NodeTxSubmitProvider implements TxSubmitProvider {
49+
#logger: Logger;
50+
#cardanoNode: ObservableTxSubmitter;
51+
#handleProvider?: HandleProvider;
52+
53+
constructor({ handleProvider, cardanoNode, logger }: NodeTxSubmitProviderProps) {
54+
this.#handleProvider = handleProvider;
55+
this.#cardanoNode = cardanoNode;
56+
this.#logger = logger;
57+
}
58+
59+
async submitTx({ signedTransaction, context }: SubmitTxArgs): Promise<void> {
60+
await this.#throwIfHandleResolutionConflict(context);
61+
await firstValueFrom(this.#cardanoNode.submitTx(signedTransaction)).catch(toProviderError);
62+
}
63+
64+
async healthCheck(): Promise<HealthCheckResponse> {
65+
const [cardanoNodeHealth, handleProviderHealth] = await Promise.all([
66+
firstValueFrom(this.#cardanoNode.healthCheck$).catch((error): HealthCheckResponse => {
67+
if (error instanceof EmptyError) {
68+
return { ok: false, reason: emptyMessage };
69+
}
70+
this.#logger.error('Unexpected healtcheck error', error);
71+
return { ok: false, reason: 'Internal error' };
72+
}),
73+
this.#handleProvider?.healthCheck()
74+
]);
75+
return {
76+
localNode: cardanoNodeHealth.localNode,
77+
ok: cardanoNodeHealth.ok && (!handleProviderHealth || handleProviderHealth.ok),
78+
reason: cardanoNodeHealth.reason || handleProviderHealth?.reason
79+
};
80+
}
81+
82+
async #throwIfHandleResolutionConflict(context: SubmitTxArgs['context']): Promise<void> {
83+
if (context?.handleResolutions && context.handleResolutions.length > 0) {
84+
if (!this.#handleProvider) {
85+
throw new ProviderError(
86+
ProviderFailure.NotImplemented,
87+
undefined,
88+
'No HandleProvider was set during construction.'
89+
);
90+
}
91+
92+
const handleInfoList = await this.#handleProvider.resolveHandles({
93+
handles: context.handleResolutions.map((hndRes) => hndRes.handle)
94+
});
95+
96+
for (const [index, handleInfo] of handleInfoList.entries()) {
97+
if (!handleInfo || handleInfo.cardanoAddress !== context.handleResolutions[index].cardanoAddress) {
98+
const handleOwnerChangeError = new HandleOwnerChangeError(
99+
context.handleResolutions[index].handle,
100+
context.handleResolutions[index].cardanoAddress,
101+
handleInfo ? handleInfo.cardanoAddress : null
102+
);
103+
throw new ProviderError(ProviderFailure.Conflict, handleOwnerChangeError);
104+
}
105+
}
106+
}
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './rabbitmq';
2+
export * from './NodeTxSubmitProvider';
23
export * from './TxSubmitHttpService';

packages/cardano-services/src/TxSubmit/rabbitmq/TxSubmitWorker.ts

+2-12
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
/* eslint-disable @typescript-eslint/no-shadow */
22
import { CONNECTION_ERROR_EVENT, TX_SUBMISSION_QUEUE, serializeError, waitForPending } from './utils';
3-
import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR } from '@cardano-sdk/core';
3+
import { Cardano, ProviderError, ProviderFailure, TxBodyCBOR, TxCBOR, TxSubmitProvider } from '@cardano-sdk/core';
44
import { Channel, Connection, Message, connect } from 'amqplib';
55
import { EventEmitter } from 'events';
66
import { Logger } from 'ts-log';
7-
import { OgmiosTxSubmitProvider } from '@cardano-sdk/ogmios';
87
import { bufferToHexString } from '@cardano-sdk/util';
98

109
const moduleName = 'TxSubmitWorker';
@@ -30,7 +29,7 @@ export interface TxSubmitWorkerDependencies {
3029
logger: Logger;
3130

3231
/** The provider to use to submit tx */
33-
txSubmitProvider: OgmiosTxSubmitProvider;
32+
txSubmitProvider: TxSubmitProvider;
3433
}
3534

3635
/**
@@ -111,11 +110,6 @@ export class TxSubmitWorker extends EventEmitter {
111110
* https://amqp-node.github.io/amqplib/channel_api.html#model_events
112111
*/
113112
async start() {
114-
if (this.#dependencies.txSubmitProvider.state === null) {
115-
this.#dependencies.logger.info(`${moduleName} init: initialize and start tx submission provider`);
116-
await this.#dependencies.txSubmitProvider.initialize();
117-
await this.#dependencies.txSubmitProvider.start();
118-
}
119113
this.#dependencies.logger.info(`${moduleName} init: checking tx submission provider health status`);
120114

121115
const health = await this.#dependencies.txSubmitProvider.healthCheck();
@@ -156,10 +150,6 @@ export class TxSubmitWorker extends EventEmitter {
156150
}
157151

158152
async shutdown() {
159-
if (this.#dependencies.txSubmitProvider.state === 'running') {
160-
this.#dependencies.logger.info(`${moduleName} shutdown: shutdown tx submission provider`);
161-
await this.#dependencies.txSubmitProvider.shutdown();
162-
}
163153
await this.stop();
164154
}
165155
/** Stops the worker. */

0 commit comments

Comments
 (0)