Skip to content

Commit bf4a8b9

Browse files
committed
fix: retry all ProviderErrors except BadRequest and NotImplemented
BaseWallet was retrying all errors, which could potentially hide bugs by retrying something that will never recover BREAKING CHANGE: BaseWallet observables error instead of emitting fatalError$ - remove ObservableError.fatalError$ - 'poll' util observable errors instead of calling onFatalError - remove PollProps.onFatalError - 'poll' no longer checks for InvalidStringError, it's up to consumer
1 parent 9bad2df commit bf4a8b9

21 files changed

+188
-314
lines changed

packages/util-rxjs/src/poll.ts

+38-62
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
1-
import { InvalidStringError, strictEquals } from '@cardano-sdk/util';
21
import { Logger } from 'ts-log';
32
import {
43
NEVER,
54
Observable,
6-
Subject,
7-
catchError,
85
concat,
96
defer,
107
distinctUntilChanged,
118
from,
12-
merge,
139
mergeMap,
1410
of,
1511
switchMap,
1612
takeUntil,
1713
throwError
1814
} from 'rxjs';
1915
import { RetryBackoffConfig, retryBackoff } from 'backoff-rxjs';
16+
import { strictEquals } from '@cardano-sdk/util';
17+
18+
const POLL_UNTIL_RETRY = Symbol('POLL_UNTIL_RETRY');
2019

2120
export interface PollProps<T> {
2221
sample: () => Promise<T>;
2322
retryBackoffConfig: RetryBackoffConfig;
24-
onFatalError?: (value: unknown) => void;
2523
trigger$?: Observable<unknown>;
2624
equals?: (t1: T, t2: T) => boolean;
2725
combinator?: typeof switchMap;
@@ -33,71 +31,49 @@ export interface PollProps<T> {
3331
export const poll = <T>({
3432
sample,
3533
retryBackoffConfig,
36-
onFatalError,
3734
trigger$ = of(true),
3835
equals = strictEquals,
3936
combinator = switchMap,
4037
cancel$ = NEVER,
4138
pollUntil = () => true,
4239
logger
4340
}: PollProps<T>) =>
44-
new Observable<T>((subscriber) => {
45-
const cancelOnFatalError$ = new Subject<boolean>();
46-
const internalCancel$ = merge(cancel$, cancelOnFatalError$);
47-
const sub = trigger$
48-
.pipe(
49-
combinator(() =>
50-
defer(() =>
51-
from(sample()).pipe(
52-
mergeMap((v) =>
53-
pollUntil(v)
54-
? of(v)
55-
: // Emit value, but also throw error to force retryBackoff to kick in
56-
concat(
57-
of(v),
58-
throwError(() => new Error('polling'))
59-
)
60-
)
61-
)
62-
).pipe(
63-
retryBackoff({
64-
...retryBackoffConfig,
65-
shouldRetry: (error) => {
66-
logger.error(error);
67-
68-
if (retryBackoffConfig.shouldRetry) {
69-
const shouldRetry = retryBackoffConfig.shouldRetry(error);
70-
logger.debug(`Should retry: ${shouldRetry}`);
71-
72-
if (!shouldRetry) {
73-
return false;
74-
}
75-
}
41+
trigger$.pipe(
42+
combinator(() =>
43+
defer(() =>
44+
from(sample()).pipe(
45+
mergeMap((v) =>
46+
pollUntil(v)
47+
? of(v)
48+
: // Emit value, but also throw error to force retryBackoff to kick in
49+
concat(
50+
of(v),
51+
throwError(() => POLL_UNTIL_RETRY)
52+
)
53+
)
54+
)
55+
).pipe(
56+
retryBackoff({
57+
...retryBackoffConfig,
58+
shouldRetry: (error) => {
59+
if (error === POLL_UNTIL_RETRY) {
60+
logger.warn('"pollUntil" condition not met, will retry');
61+
return true;
62+
}
7663

77-
if (error instanceof InvalidStringError) {
78-
onFatalError?.(error);
79-
cancelOnFatalError$.next(true);
80-
return false;
81-
}
64+
logger.error(error);
8265

83-
return true;
84-
}
85-
}),
86-
catchError((error) => {
87-
onFatalError?.(error);
66+
if (retryBackoffConfig.shouldRetry) {
67+
const shouldRetry = retryBackoffConfig.shouldRetry(error);
68+
logger.debug(`Should retry: ${shouldRetry}`);
69+
return shouldRetry;
70+
}
8871

89-
// Re-throw the error to propagate it to the subscriber and complete the observable
90-
return throwError(() => error);
91-
})
92-
)
93-
),
94-
distinctUntilChanged(equals),
95-
takeUntil(internalCancel$)
72+
return true;
73+
}
74+
})
9675
)
97-
.subscribe(subscriber);
98-
99-
return () => {
100-
sub.unsubscribe();
101-
cancelOnFatalError$.complete();
102-
};
103-
});
76+
),
77+
distinctUntilChanged(equals),
78+
takeUntil(cancel$)
79+
);

packages/util-rxjs/test/poll.test.ts

+7-10
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ describe('poll', () => {
6060
expect(resolvedValue).toBeTruthy();
6161
});
6262

63-
it('does not retry, when sample rejects with InvalidStringError', async () => {
63+
it('does not retry, when shouldRetry returns false', async () => {
6464
const testValue = { test: 'value' };
6565
const testError = new InvalidStringError('Test invalid string error');
6666
const sample = jest
@@ -69,23 +69,23 @@ describe('poll', () => {
6969
.mockResolvedValueOnce(testValue)
7070
.mockRejectedValueOnce(testError)
7171
.mockResolvedValueOnce(testValue);
72-
const onFatalError = jest.fn();
73-
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, shouldRetry: () => true };
72+
const retryBackoffConfig: RetryBackoffConfig = {
73+
initialInterval: 1,
74+
shouldRetry: (error) => !(error instanceof InvalidStringError)
75+
};
7476
const values$ = poll({
7577
logger,
76-
onFatalError,
7778
retryBackoffConfig,
7879
sample
7980
});
8081
await expect(firstValueFrom(values$)).resolves.toBe(testValue);
81-
await expect(firstValueFrom(values$)).rejects.toThrow(EmptyError);
82+
await expect(firstValueFrom(values$)).rejects.toThrow(testError);
8283
expect(sample).toBeCalledTimes(3);
83-
expect(onFatalError).toBeCalledWith(testError);
8484
expect(logger.messages).toStrictEqual([
8585
{ level: 'error', message: [new Error(testErrorStr)] },
8686
{ level: 'debug', message: ['Should retry: true'] },
8787
{ level: 'error', message: [testError] },
88-
{ level: 'debug', message: ['Should retry: true'] }
88+
{ level: 'debug', message: ['Should retry: false'] }
8989
]);
9090
});
9191

@@ -116,19 +116,16 @@ describe('poll', () => {
116116
const sample = jest.fn().mockRejectedValue(testError);
117117
const maxRetries = 3;
118118
const retryBackoffConfig: RetryBackoffConfig = { initialInterval: 1, maxRetries };
119-
const onFatalError = jest.fn();
120119

121120
const values$ = poll({
122121
logger,
123-
onFatalError,
124122
retryBackoffConfig,
125123
sample
126124
});
127125

128126
await expect(firstValueFrom(values$)).rejects.toThrow(testError);
129127

130128
expect(sample).toBeCalledTimes(maxRetries + 1);
131-
expect(onFatalError).toBeCalledWith(expect.any(Error));
132129
expect(logger.messages).toStrictEqual([
133130
{ level: 'error', message: [testError] },
134131
{ level: 'error', message: [testError] },

packages/wallet/src/Wallets/BaseWallet.ts

+9-24
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ import {
5353
createUtxoTracker,
5454
createWalletUtil,
5555
currentEpochTracker,
56-
distinctEraSummaries
56+
distinctEraSummaries,
57+
pollProvider
5758
} from '../services';
5859
import { AddressType, Bip32Account, GroupedAddress, WitnessedTx, Witnesser, util } from '@cardano-sdk/key-management';
5960
import {
@@ -71,7 +72,7 @@ import {
7172
TxSubmitProvider,
7273
UtxoProvider
7374
} from '@cardano-sdk/core';
74-
import { BehaviorObservable, TrackerSubject, poll } from '@cardano-sdk/util-rxjs';
75+
import { BehaviorObservable, TrackerSubject } from '@cardano-sdk/util-rxjs';
7576
import {
7677
BehaviorSubject,
7778
EMPTY,
@@ -282,7 +283,6 @@ export class BaseWallet implements ObservableWallet {
282283
readonly protocolParameters$: TrackerSubject<Cardano.ProtocolParameters>;
283284
readonly genesisParameters$: TrackerSubject<Cardano.CompactGenesis>;
284285
readonly assetInfo$: TrackerSubject<Assets>;
285-
readonly fatalError$: Subject<unknown>;
286286
readonly syncStatus: SyncStatus;
287287
readonly name: string;
288288
readonly util: WalletUtil;
@@ -357,10 +357,6 @@ export class BaseWallet implements ObservableWallet {
357357

358358
this.witnesser = witnesser;
359359

360-
this.fatalError$ = new Subject();
361-
362-
const onFatalError = this.fatalError$.next.bind(this.fatalError$);
363-
364360
this.name = name;
365361
const cancel$ = connectionStatusTracker$.pipe(
366362
tap((status) => (status === ConnectionStatus.up ? 'Connection UP' : 'Connection DOWN')),
@@ -369,10 +365,9 @@ export class BaseWallet implements ObservableWallet {
369365

370366
if (isBip32PublicCredentialsManager(this.#publicCredentialsManager)) {
371367
this.#addressTracker = createAddressTracker({
372-
addressDiscovery$: poll({
368+
addressDiscovery$: pollProvider({
373369
cancel$,
374370
logger: contextLogger(this.#logger, 'addressDiscovery$'),
375-
onFatalError,
376371
retryBackoffConfig,
377372
sample: () => {
378373
const credManager = this.#publicCredentialsManager as Bip32PublicCredentialsManager;
@@ -403,10 +398,9 @@ export class BaseWallet implements ObservableWallet {
403398
logger: contextLogger(this.#logger, 'tip$'),
404399
maxPollInterval: maxInterval,
405400
minPollInterval: pollInterval,
406-
provider$: poll({
401+
provider$: pollProvider({
407402
cancel$,
408403
logger: contextLogger(this.#logger, 'tip$'),
409-
onFatalError,
410404
retryBackoffConfig,
411405
sample: this.networkInfoProvider.ledgerTip
412406
}),
@@ -426,11 +420,10 @@ export class BaseWallet implements ObservableWallet {
426420
// Era summaries
427421
const eraSummariesTrigger = new BehaviorSubject<void>(void 0);
428422
this.eraSummaries$ = new PersistentDocumentTrackerSubject(
429-
poll({
423+
pollProvider({
430424
cancel$,
431425
equals: deepEquals,
432426
logger: contextLogger(this.#logger, 'eraSummaries$'),
433-
onFatalError,
434427
retryBackoffConfig,
435428
sample: this.networkInfoProvider.eraSummaries,
436429
trigger$: eraSummariesTrigger.pipe(tap(() => 'Trigger request era summaries'))
@@ -450,23 +443,21 @@ export class BaseWallet implements ObservableWallet {
450443
tap((epoch) => this.#logger.debug(`Current epoch is ${epoch}`))
451444
);
452445
this.protocolParameters$ = new PersistentDocumentTrackerSubject(
453-
poll({
446+
pollProvider({
454447
cancel$,
455448
equals: isEqual,
456449
logger: contextLogger(this.#logger, 'protocolParameters$'),
457-
onFatalError,
458450
retryBackoffConfig,
459451
sample: this.networkInfoProvider.protocolParameters,
460452
trigger$: epoch$
461453
}),
462454
stores.protocolParameters
463455
);
464456
this.genesisParameters$ = new PersistentDocumentTrackerSubject(
465-
poll({
457+
pollProvider({
466458
cancel$,
467459
equals: isEqual,
468460
logger: contextLogger(this.#logger, 'genesisParameters$'),
469-
onFatalError,
470461
retryBackoffConfig,
471462
sample: this.networkInfoProvider.genesisParameters,
472463
trigger$: epoch$
@@ -487,7 +478,6 @@ export class BaseWallet implements ObservableWallet {
487478
inFlightTransactionsStore: stores.inFlightTransactions,
488479
logger: contextLogger(this.#logger, 'transactions'),
489480
newTransactions: this.#newTransactions,
490-
onFatalError,
491481
retryBackoffConfig,
492482
signedTransactionsStore: stores.signedTransactions,
493483
tip$: this.tip$,
@@ -521,7 +511,6 @@ export class BaseWallet implements ObservableWallet {
521511
addresses$,
522512
history$: this.transactions.history$,
523513
logger: contextLogger(this.#logger, 'utxo'),
524-
onFatalError,
525514
retryBackoffConfig,
526515
stores,
527516
transactionsInFlight$: this.transactions.outgoing.inFlight$,
@@ -546,7 +535,6 @@ export class BaseWallet implements ObservableWallet {
546535
eraSummaries$,
547536
knownAddresses$: this.addresses$,
548537
logger: contextLogger(this.#logger, 'delegation'),
549-
onFatalError,
550538
retryBackoffConfig,
551539
rewardAccountAddresses$: this.addresses$.pipe(
552540
map((addresses) => uniq(addresses.map((groupedAddress) => groupedAddress.rewardAccount)))
@@ -592,7 +580,6 @@ export class BaseWallet implements ObservableWallet {
592580
balanceTracker: this.balance,
593581
logger: contextLogger(this.#logger, 'assets$'),
594582
maxAssetInfoCacheAge,
595-
onFatalError,
596583
retryBackoffConfig,
597584
transactionsTracker: this.transactions
598585
}),
@@ -602,11 +589,10 @@ export class BaseWallet implements ObservableWallet {
602589
this.handles$ = this.handleProvider
603590
? this.initializeHandles(
604591
new PersistentDocumentTrackerSubject(
605-
poll({
592+
pollProvider({
606593
cancel$,
607594
equals: isEqual,
608595
logger: contextLogger(this.#logger, 'handles$'),
609-
onFatalError,
610596
retryBackoffConfig,
611597
sample: () => this.handleProvider.getPolicyIds()
612598
}),
@@ -798,7 +784,6 @@ export class BaseWallet implements ObservableWallet {
798784
this.currentEpoch$.complete();
799785
this.delegation.shutdown();
800786
this.assetInfo$.complete();
801-
this.fatalError$.complete();
802787
this.syncStatus.shutdown();
803788
this.#newTransactions.failedToSubmit$.complete();
804789
this.#newTransactions.pending$.complete();

0 commit comments

Comments
 (0)