Skip to content

Commit ff499b2

Browse files
committed
feat!: add ObservableCardanoNode.submitTx method
implement it for OgmiosObservableCardanoNode
1 parent 06cc4d4 commit ff499b2

File tree

5 files changed

+142
-15
lines changed

5 files changed

+142
-15
lines changed

packages/core/src/CardanoNode/types/ObservableCardanoNode.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { bufferChainSyncEvent } from '../util/bufferChainSyncEvent';
2-
import type { Cardano, HealthCheckResponse } from '../..';
2+
import type { Cardano, HealthCheckResponse, TxCBOR } from '../..';
33
import type { EraSummary } from './CardanoNode';
44
import type { Observable } from 'rxjs';
55

@@ -71,6 +71,12 @@ export interface ObservableCardanoNode {
7171
* @throws CardanoNodeErrors.UnknownCardanoNodeError on any other unexpected/unhandled errors
7272
*/
7373
findIntersect(points: PointOrOrigin[]): Observable<ObservableChainSync>;
74+
75+
/**
76+
* @param tx serialized transaction
77+
* @returns transaction id
78+
*/
79+
submitTx(tx: TxCBOR): Observable<Cardano.TransactionId>;
7480
}
7581

7682
export const ObservableCardanoNode = { bufferChainSyncEvent } as const;

packages/ogmios/src/CardanoNode/OgmiosObservableCardanoNode/OgmiosObservableCardanoNode.ts

+49-7
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import {
1111
ObservableCardanoNode,
1212
ObservableChainSync,
1313
PointOrOrigin,
14-
StateQueryErrorCode
14+
StateQueryErrorCode,
15+
TxCBOR
1516
} from '@cardano-sdk/core';
1617
import {
1718
ChainSynchronization,
1819
ConnectionConfig,
20+
TransactionSubmission,
1921
createConnectionObject,
2022
createLedgerStateQueryClient,
2123
getServerHealth
@@ -31,6 +33,7 @@ import {
3133
of,
3234
shareReplay,
3335
switchMap,
36+
take,
3437
throwError,
3538
timeout
3639
} from 'rxjs';
@@ -39,7 +42,7 @@ import { WithLogger, contextLogger } from '@cardano-sdk/util';
3942
import { createObservableChainSyncClient } from './createObservableChainSyncClient';
4043
import { ogmiosServerHealthToHealthCheckResponse } from '../../util';
4144
import { ogmiosToCorePointOrOrigin, ogmiosToCoreTipOrOrigin, pointOrOriginToOgmios } from './util';
42-
import { queryEraSummaries, queryGenesisParameters } from '../queries';
45+
import { queryEraSummaries, queryGenesisParameters, withCoreCardanoNodeError } from '../queries';
4346
import isEqual from 'lodash/isEqual';
4447

4548
const ogmiosToCoreIntersection = (intersection: ChainSynchronization.Intersection) => ({
@@ -48,9 +51,11 @@ const ogmiosToCoreIntersection = (intersection: ChainSynchronization.Intersectio
4851
});
4952

5053
export type LocalStateQueryRetryConfig = Pick<RetryBackoffConfig, 'initialInterval' | 'maxInterval'>;
54+
export type SubmitTxRetryConfig = Pick<RetryBackoffConfig, 'initialInterval' | 'maxInterval' | 'maxRetries'>;
5155

52-
const DEFAULT_HEALTH_CHECK_TIMEOUT = 2000;
53-
const DEFAULT_LSQ_RETRY_CONFIG: LocalStateQueryRetryConfig = {
56+
const DEFAULT_HEALTH_CHECK_TIMEOUT = Milliseconds(2000);
57+
const DEFAULT_SUBMIT_MAX_RETRIES = 5;
58+
const DEFAULT_RETRY_CONFIG: LocalStateQueryRetryConfig = {
5459
initialInterval: 1000,
5560
maxInterval: 30_000
5661
};
@@ -59,16 +64,22 @@ export type OgmiosObservableCardanoNodeProps = InteractionContextProps & {
5964
healthCheckTimeout?: Milliseconds;
6065
/** Default: {initialInterval: 1000, maxInterval: 30_000} */
6166
localStateQueryRetryConfig?: LocalStateQueryRetryConfig;
67+
/** Default: {initialInterval: 1000, maxInterval: 30_000, maxRetries: 5} */
68+
submitTxQueryRetryConfig?: SubmitTxRetryConfig;
6269
};
6370

64-
const retryableStateQueryErrors = new Set<number>([
71+
const retryableCardanoNodeErrors = new Set<number>([
6572
GeneralCardanoNodeErrorCode.ServerNotReady,
66-
StateQueryErrorCode.UnavailableInCurrentEra,
6773
GeneralCardanoNodeErrorCode.ConnectionFailure
6874
]);
6975

76+
const retryableStateQueryErrors = new Set<number>([
77+
...retryableCardanoNodeErrors,
78+
StateQueryErrorCode.UnavailableInCurrentEra
79+
]);
80+
7081
const stateQueryRetryBackoffConfig = (
71-
retryConfig: LocalStateQueryRetryConfig = DEFAULT_LSQ_RETRY_CONFIG,
82+
retryConfig: LocalStateQueryRetryConfig = DEFAULT_RETRY_CONFIG,
7283
logger: Logger
7384
): RetryBackoffConfig => ({
7485
...retryConfig,
@@ -81,8 +92,24 @@ const stateQueryRetryBackoffConfig = (
8192
}
8293
});
8394

95+
const submitTxRetryBackoffConfig = (
96+
retryConfig: SubmitTxRetryConfig = DEFAULT_RETRY_CONFIG,
97+
logger: Logger
98+
): RetryBackoffConfig => ({
99+
...retryConfig,
100+
maxRetries: retryConfig.maxRetries || DEFAULT_SUBMIT_MAX_RETRIES,
101+
shouldRetry: (error) => {
102+
if (retryableStateQueryErrors.has(CardanoNodeUtil.asCardanoNodeError(error)?.code)) {
103+
logger.warn('Failed to submitTx, will retry', error);
104+
return true;
105+
}
106+
return false;
107+
}
108+
});
109+
84110
export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
85111
readonly #connectionConfig$: Observable<ConnectionConfig>;
112+
readonly #submitTxRetryBackoffConfig: RetryBackoffConfig;
86113
readonly #logger: Logger;
87114
readonly #interactionContext$;
88115

@@ -93,6 +120,7 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
93120
constructor(props: OgmiosObservableCardanoNodeProps, { logger }: WithLogger) {
94121
this.#connectionConfig$ = props.connectionConfig$;
95122
this.#logger = contextLogger(logger, 'ObservableOgmiosCardanoNode');
123+
this.#submitTxRetryBackoffConfig = submitTxRetryBackoffConfig(props.submitTxQueryRetryConfig, logger);
96124
this.#interactionContext$ = createObservableInteractionContext(
97125
{
98126
...props
@@ -177,4 +205,18 @@ export class OgmiosObservableCardanoNode implements ObservableCardanoNode {
177205
)
178206
);
179207
}
208+
209+
submitTx(tx: TxCBOR): Observable<Cardano.TransactionId> {
210+
return this.#interactionContext$.pipe(
211+
switchMap((context) =>
212+
from(
213+
withCoreCardanoNodeError(() =>
214+
TransactionSubmission.submitTransaction(context, tx)
215+
) as Promise<Cardano.TransactionId>
216+
)
217+
),
218+
retryBackoff(this.#submitTxRetryBackoffConfig),
219+
take(1)
220+
);
221+
}
180222
}

packages/ogmios/test/CardanoNode/ObservableOgmiosCardanoNode.test.ts

+76-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import {
55
GeneralCardanoNodeErrorCode,
66
Milliseconds,
77
StateQueryError,
8-
StateQueryErrorCode
8+
StateQueryErrorCode,
9+
TxCBOR,
10+
TxSubmissionError,
11+
TxSubmissionErrorCode
912
} from '@cardano-sdk/core';
1013
import { Connection, InteractionContext, Mirror, createConnectionObject, safeJSON } from '@cardano-ogmios/client';
1114
import { HEALTH_RESPONSE_BODY } from '../mocks/util';
@@ -16,11 +19,12 @@ import {
1619
MockedChainSynchronization,
1720
MockedLedgerStateQueryClient,
1821
MockedSocket,
22+
MockedTransactionSubmission,
1923
ogmiosEraSummaries
2024
} from './util';
21-
import { NextBlockResponse, RollForward } from '@cardano-ogmios/schema';
25+
import { NextBlockResponse, RollForward, SubmitTransactionFailureEraMismatch } from '@cardano-ogmios/schema';
2226
import { OgmiosObservableCardanoNode } from '../../src';
23-
import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of } from 'rxjs';
27+
import { combineLatest, delay as delayEmission, firstValueFrom, mergeMap, of, toArray } from 'rxjs';
2428
import { generateRandomHexString, logger } from '@cardano-sdk/util-dev';
2529
import { mockGenesisShelley, mockShelleyBlock } from '../ogmiosToCore/testData';
2630
import delay from 'delay';
@@ -30,6 +34,9 @@ jest.mock('@cardano-ogmios/client', () => {
3034
return {
3135
...original,
3236
ChainSynchronization: {},
37+
TransactionSubmission: {
38+
submitTransaction: jest.fn()
39+
},
3340
createInteractionContext: jest.fn(),
3441
createLedgerStateQueryClient: jest.fn(),
3542
getServerHealth: jest.fn()
@@ -42,6 +49,7 @@ describe('ObservableOgmiosCardanoNode', () => {
4249
let getServerHealth: MockGetServerHealth;
4350
let socket: MockedSocket;
4451
let chainSynchronization: MockedChainSynchronization;
52+
let TransactionSubmission: MockedTransactionSubmission;
4553
let createInteractionContext: MockCreateInteractionContext;
4654

4755
const tip = {
@@ -56,7 +64,8 @@ describe('ObservableOgmiosCardanoNode', () => {
5664
createInteractionContext,
5765
createLedgerStateQueryClient,
5866
getServerHealth,
59-
ChainSynchronization: chainSynchronization
67+
ChainSynchronization: chainSynchronization,
68+
TransactionSubmission
6069
} = require('@cardano-ogmios/client'));
6170
ledgerStateQueryClient = {
6271
eraSummaries: jest.fn() as MockedLedgerStateQueryClient['eraSummaries'],
@@ -104,6 +113,7 @@ describe('ObservableOgmiosCardanoNode', () => {
104113
createInteractionContext.mockReset();
105114
createLedgerStateQueryClient.mockReset();
106115
getServerHealth.mockReset();
116+
TransactionSubmission.submitTransaction.mockReset();
107117
});
108118

109119
describe('LSQs on QueryUnavailableInCurrentEra', () => {
@@ -155,7 +165,6 @@ describe('ObservableOgmiosCardanoNode', () => {
155165
});
156166
});
157167

158-
// TODO: this passes when run individually but not when running entire test suite
159168
it('opaquely reconnects when connection is refused', async () => {
160169
createInteractionContext.mockRejectedValueOnce({ name: 'WebSocketClosed' });
161170
const node = new OgmiosObservableCardanoNode({ connectionConfig$: of(connection) }, { logger });
@@ -222,4 +231,66 @@ describe('ObservableOgmiosCardanoNode', () => {
222231
expect(result.ok).toBe(false);
223232
});
224233
});
234+
235+
describe('submitTx', () => {
236+
let node: OgmiosObservableCardanoNode;
237+
const submitTxMaxRetries = 2;
238+
239+
beforeEach(() => {
240+
node = new OgmiosObservableCardanoNode(
241+
{
242+
connectionConfig$: of(connection),
243+
submitTxQueryRetryConfig: { initialInterval: 1, maxRetries: submitTxMaxRetries }
244+
},
245+
{ logger }
246+
);
247+
});
248+
249+
describe('successful submission', () => {
250+
it('emits transaction id and completes', async () => {
251+
TransactionSubmission.submitTransaction.mockResolvedValueOnce('id');
252+
await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR).pipe(toArray()))).resolves.toEqual(['id']);
253+
expect(TransactionSubmission.submitTransaction).toBeCalledTimes(1);
254+
});
255+
});
256+
257+
describe('submission error', () => {
258+
it('maps error to core type', async () => {
259+
TransactionSubmission.submitTransaction.mockRejectedValueOnce({
260+
code: 3005,
261+
data: { ledgerEra: 'shelley', queryEra: 'alonzo' },
262+
message: 'Era mismatch'
263+
} as SubmitTransactionFailureEraMismatch);
264+
await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).rejects.toThrowError(
265+
expect.objectContaining({
266+
code: TxSubmissionErrorCode.EraMismatch,
267+
name: TxSubmissionError.name
268+
})
269+
);
270+
expect(TransactionSubmission.submitTransaction).toBeCalledTimes(1);
271+
});
272+
});
273+
274+
describe('connection error', () => {
275+
it('attempts to resubmit opaquely', async () => {
276+
TransactionSubmission.submitTransaction
277+
.mockRejectedValueOnce({ code: 'ECONNREFUSED' })
278+
.mockResolvedValueOnce('id');
279+
await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).resolves.toBe('id');
280+
expect(TransactionSubmission.submitTransaction).toBeCalledTimes(2);
281+
});
282+
283+
it('rejects after maxRetries attempts to submit', async () => {
284+
const error = { code: 'ECONNREFUSED' };
285+
TransactionSubmission.submitTransaction.mockRejectedValue(error);
286+
287+
await expect(firstValueFrom(node.submitTx('cbor' as TxCBOR))).rejects.toThrowError(
288+
expect.objectContaining({
289+
code: GeneralCardanoNodeErrorCode.ConnectionFailure
290+
})
291+
);
292+
expect(TransactionSubmission.submitTransaction).toBeCalledTimes(submitTxMaxRetries + 1);
293+
});
294+
});
295+
});
225296
});

packages/ogmios/test/CardanoNode/util.ts

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
ChainSynchronization,
33
InteractionContext,
4+
TransactionSubmission,
45
createInteractionContext,
56
createLedgerStateQueryClient,
67
getServerHealth
@@ -13,6 +14,7 @@ export type MockedLedgerStateQueryClient = jest.Mocked<LedgerStateQueryClient>;
1314
export type MockCreateLedgerStateQuery = jest.MockedFunction<typeof createLedgerStateQueryClient>;
1415
export type MockGetServerHealth = jest.MockedFunction<typeof getServerHealth>;
1516
export type MockedChainSynchronization = jest.Mocked<typeof ChainSynchronization>;
17+
export type MockedTransactionSubmission = jest.Mocked<typeof TransactionSubmission>;
1618
export type MockCreateInteractionContext = jest.MockedFunction<typeof createInteractionContext>;
1719

1820
export const ogmiosEraSummaries: EraSummary[] = [

packages/util-dev/src/chainSync/index.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import {
66
ChainSyncEventType,
77
ChainSyncRollBackward,
88
ChainSyncRollForward,
9+
GeneralCardanoNodeError,
10+
GeneralCardanoNodeErrorCode,
911
Intersection,
1012
ObservableCardanoNode,
1113
Point,
1214
PointOrOrigin
1315
} from '@cardano-sdk/core';
14-
import { Observable, of } from 'rxjs';
16+
import { Observable, of, throwError } from 'rxjs';
1517
import { fromSerializableObject } from '@cardano-sdk/util';
1618
import { genesisToEraSummary } from './genesisToEraSummary';
1719
import memoize from 'lodash/memoize';
@@ -135,7 +137,11 @@ export const chainSyncData = memoize((dataSet: ChainSyncDataSet) => {
135137
});
136138
},
137139
genesisParameters$: of(compactGenesis),
138-
healthCheck$: new Observable()
140+
healthCheck$: new Observable(),
141+
submitTx: () =>
142+
throwError(
143+
() => new GeneralCardanoNodeError(GeneralCardanoNodeErrorCode.Unknown, null, 'submitTx is not implemented')
144+
)
139145
};
140146
return {
141147
allEvents,

0 commit comments

Comments
 (0)