@@ -38,9 +38,7 @@ import { distinctBlock, signedTxsEquals, transactionsEquals, txInEquals } from '
38
38
import { WitnessedTx } from '@cardano-sdk/key-management' ;
39
39
import { newAndStoredMulticast } from './util/newAndStoredMulticast' ;
40
40
import chunk from 'lodash/chunk.js' ;
41
- import intersectionBy from 'lodash/intersectionBy.js' ;
42
41
import sortBy from 'lodash/sortBy.js' ;
43
- import unionBy from 'lodash/unionBy.js' ;
44
42
45
43
export interface TransactionsTrackerProps {
46
44
chainHistoryProvider : ChainHistoryProvider ;
@@ -107,15 +105,159 @@ const allTransactionsByAddresses = async (
107
105
return response ;
108
106
} ;
109
107
110
- export const createAddressTransactionsProvider = ( {
108
+ const getLastTransactionsAtBlock = (
109
+ transactions : Cardano . HydratedTx [ ] ,
110
+ blockNo : Cardano . BlockNo
111
+ ) : Cardano . HydratedTx [ ] => {
112
+ const txsFromSameBlock = [ ] ;
113
+
114
+ for ( let i = transactions . length - 1 ; i >= 0 ; -- i ) {
115
+ const tx = transactions [ i ] ;
116
+ if ( tx . blockHeader . blockNo === blockNo ) {
117
+ txsFromSameBlock . push ( tx ) ;
118
+ } else {
119
+ break ;
120
+ }
121
+ }
122
+
123
+ return txsFromSameBlock ;
124
+ } ;
125
+
126
+ export const revertLastBlock = (
127
+ localTransactions : Cardano . HydratedTx [ ] ,
128
+ blockNo : Cardano . BlockNo ,
129
+ rollback$ : Subject < Cardano . HydratedTx > ,
130
+ newTransactions : Cardano . HydratedTx [ ] ,
131
+ logger : Logger
132
+ ) => {
133
+ const result = [ ...localTransactions ] ;
134
+
135
+ while ( result . length > 0 ) {
136
+ const lastKnownTx = result [ result . length - 1 ] ;
137
+
138
+ if ( lastKnownTx . blockHeader . blockNo === blockNo ) {
139
+ // only emit if the tx is also not present in the new transactions to be added
140
+ if ( newTransactions . findIndex ( ( tx ) => tx . id === lastKnownTx . id ) === - 1 ) {
141
+ logger . debug ( `Transaction ${ lastKnownTx . id } was rolled back` ) ;
142
+ rollback$ . next ( lastKnownTx ) ;
143
+ }
144
+
145
+ result . pop ( ) ;
146
+ } else {
147
+ break ;
148
+ }
149
+ }
150
+
151
+ return result ;
152
+ } ;
153
+
154
+ const findIntersectionAndUpdateTxStore = ( {
111
155
chainHistoryProvider,
112
- addresses$,
156
+ logger,
157
+ store,
113
158
retryBackoffConfig,
159
+ onFatalError,
114
160
tipBlockHeight$,
115
- store,
116
- logger,
117
- onFatalError
118
- } : TransactionsTrackerInternalsProps ) : TransactionsTrackerInternals => {
161
+ rollback$,
162
+ localTransactions,
163
+ addresses
164
+ } : Pick <
165
+ TransactionsTrackerInternalsProps ,
166
+ 'chainHistoryProvider' | 'logger' | 'store' | 'retryBackoffConfig' | 'onFatalError' | 'tipBlockHeight$'
167
+ > & {
168
+ localTransactions : Cardano . HydratedTx [ ] ;
169
+ rollback$ : Subject < Cardano . HydratedTx > ;
170
+ addresses : Cardano . PaymentAddress [ ] ;
171
+ } ) =>
172
+ coldObservableProvider ( {
173
+ // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
174
+ // It should also help when using poor internet connection.
175
+ // Caveat is that local transactions might get out of date...
176
+ combinator : exhaustMap ,
177
+ equals : transactionsEquals ,
178
+ onFatalError,
179
+ // eslint-disable-next-line sonarjs/cognitive-complexity
180
+ provider : async ( ) => {
181
+ let rollbackOcurred = false ;
182
+ // eslint-disable-next-line no-constant-condition
183
+ while ( true ) {
184
+ const lastStoredTransaction : Cardano . HydratedTx | undefined = localTransactions [ localTransactions . length - 1 ] ;
185
+
186
+ lastStoredTransaction &&
187
+ logger . debug (
188
+ `Last stored tx: ${ lastStoredTransaction ?. id } block:${ lastStoredTransaction ?. blockHeader . blockNo } `
189
+ ) ;
190
+
191
+ const lowerBound = lastStoredTransaction ?. blockHeader . blockNo ;
192
+ const newTransactions = await allTransactionsByAddresses ( chainHistoryProvider , {
193
+ addresses,
194
+ blockRange : { lowerBound }
195
+ } ) ;
196
+
197
+ logger . debug (
198
+ `chainHistoryProvider returned ${ newTransactions . length } transactions` ,
199
+ lowerBound !== undefined && `since block ${ lowerBound } `
200
+ ) ;
201
+
202
+ // Fetching transactions from scratch, nothing else to do here.
203
+ if ( lowerBound === undefined ) {
204
+ if ( newTransactions . length > 0 ) {
205
+ localTransactions = newTransactions ;
206
+ store . setAll ( newTransactions ) ;
207
+ }
208
+
209
+ return newTransactions ;
210
+ }
211
+
212
+ // If no transactions found from that block range, it means the last known block has been rolled back.
213
+ if ( newTransactions . length === 0 ) {
214
+ localTransactions = revertLastBlock ( localTransactions , lowerBound , rollback$ , newTransactions , logger ) ;
215
+ rollbackOcurred = true ;
216
+
217
+ continue ;
218
+ }
219
+
220
+ const localTxsFromSameBlock = getLastTransactionsAtBlock ( localTransactions , lowerBound ) ;
221
+ const firstSegmentOfNewTransactions = newTransactions . slice ( 0 , localTxsFromSameBlock . length ) ;
222
+
223
+ // The first segment of new transaction should match exactly (same txs and same order) our last know TXs. Otherwise
224
+ // roll them back and re-apply in new order.
225
+ const sameTxAndOrder = localTxsFromSameBlock . every (
226
+ ( tx , index ) => tx . id === firstSegmentOfNewTransactions [ index ] . id
227
+ ) ;
228
+
229
+ if ( ! sameTxAndOrder ) {
230
+ localTransactions = revertLastBlock ( localTransactions , lowerBound , rollback$ , newTransactions , logger ) ;
231
+ rollbackOcurred = true ;
232
+
233
+ continue ;
234
+ }
235
+
236
+ // No rollbacks, if they overlap 100% do nothing, otherwise add the difference.
237
+ const areTransactionsSame =
238
+ newTransactions . length === localTxsFromSameBlock . length &&
239
+ localTxsFromSameBlock . every ( ( tx , index ) => tx . id === newTransactions [ index ] . id ) ;
240
+
241
+ if ( ! areTransactionsSame ) {
242
+ // Skip overlapping transactions to avoid duplicates
243
+ localTransactions = [ ...localTransactions , ...newTransactions . slice ( localTxsFromSameBlock . length ) ] ;
244
+ store . setAll ( localTransactions ) ;
245
+ } else if ( rollbackOcurred ) {
246
+ // This case handles rollbacks without new additions
247
+ store . setAll ( localTransactions ) ;
248
+ }
249
+
250
+ return localTransactions ;
251
+ }
252
+ } ,
253
+ retryBackoffConfig,
254
+ trigger$ : tipBlockHeight$
255
+ } ) ;
256
+
257
+ export const createAddressTransactionsProvider = (
258
+ props : TransactionsTrackerInternalsProps
259
+ ) : TransactionsTrackerInternals => {
260
+ const { addresses$, store, logger } = props ;
119
261
const rollback$ = new Subject < Cardano . HydratedTx > ( ) ;
120
262
const storedTransactions$ = store . getAll ( ) . pipe ( share ( ) ) ;
121
263
return {
@@ -127,61 +269,14 @@ export const createAddressTransactionsProvider = ({
127
269
)
128
270
) ,
129
271
combineLatest ( [ addresses$ , storedTransactions$ . pipe ( defaultIfEmpty ( [ ] as Cardano . HydratedTx [ ] ) ) ] ) . pipe (
130
- switchMap ( ( [ addresses , storedTransactions ] ) => {
131
- let localTransactions : Cardano . HydratedTx [ ] = [ ...storedTransactions ] ;
132
-
133
- return coldObservableProvider ( {
134
- // Do not re-fetch transactions twice on load when tipBlockHeight$ loads from storage first
135
- // It should also help when using poor internet connection.
136
- // Caveat is that local transactions might get out of date...
137
- combinator : exhaustMap ,
138
- equals : transactionsEquals ,
139
- onFatalError,
140
- provider : async ( ) => {
141
- // eslint-disable-next-line no-constant-condition
142
- while ( true ) {
143
- const lastStoredTransaction : Cardano . HydratedTx | undefined =
144
- localTransactions [ localTransactions . length - 1 ] ;
145
-
146
- lastStoredTransaction &&
147
- logger . debug (
148
- `Last stored tx: ${ lastStoredTransaction ?. id } block:${ lastStoredTransaction ?. blockHeader . blockNo } `
149
- ) ;
150
-
151
- const lowerBound = lastStoredTransaction ?. blockHeader . blockNo ;
152
- const newTransactions = await allTransactionsByAddresses ( chainHistoryProvider , {
153
- addresses,
154
- blockRange : { lowerBound }
155
- } ) ;
156
-
157
- logger . debug (
158
- `chainHistoryProvider returned ${ newTransactions . length } transactions` ,
159
- lowerBound !== undefined && `since block ${ lowerBound } `
160
- ) ;
161
- const duplicateTransactions =
162
- lastStoredTransaction && intersectionBy ( localTransactions , newTransactions , ( tx ) => tx . id ) ;
163
- if ( typeof duplicateTransactions !== 'undefined' && duplicateTransactions . length === 0 ) {
164
- const rollbackTransactions = localTransactions . filter (
165
- ( { blockHeader : { blockNo } } ) => blockNo >= lowerBound
166
- ) ;
167
-
168
- from ( rollbackTransactions )
169
- . pipe ( tap ( ( tx ) => logger . debug ( `Transaction ${ tx . id } was rolled back` ) ) )
170
- . subscribe ( ( v ) => rollback$ . next ( v ) ) ;
171
-
172
- // Rollback by 1 block, try again in next loop iteration
173
- localTransactions = localTransactions . filter ( ( { blockHeader : { blockNo } } ) => blockNo < lowerBound ) ;
174
- } else {
175
- localTransactions = unionBy ( localTransactions , newTransactions , ( tx ) => tx . id ) ;
176
- store . setAll ( localTransactions ) ;
177
- return localTransactions ;
178
- }
179
- }
180
- } ,
181
- retryBackoffConfig,
182
- trigger$ : tipBlockHeight$
183
- } ) ;
184
- } )
272
+ switchMap ( ( [ addresses , storedTransactions ] ) =>
273
+ findIntersectionAndUpdateTxStore ( {
274
+ addresses,
275
+ localTransactions : [ ...storedTransactions ] ,
276
+ rollback$,
277
+ ...props
278
+ } )
279
+ )
185
280
)
186
281
)
187
282
} ;
@@ -247,7 +342,9 @@ export const createTransactionsTracker = (
247
342
248
343
const transactionsSource$ = new TrackerSubject ( txSource$ ) ;
249
344
250
- const historicalTransactions$ = createHistoricalTransactionsTrackerSubject ( transactionsSource$ ) ;
345
+ const historicalTransactions$ = createHistoricalTransactionsTrackerSubject ( transactionsSource$ ) . pipe (
346
+ tap ( ( transactions ) => logger . debug ( `History transactions count: ${ transactions ?. length || 0 } ` ) )
347
+ ) ;
251
348
252
349
const [ onChainNewTxPhase2Failed$ , onChainNewTxSuccess$ ] = partition (
253
350
newTransactions$ ( historicalTransactions$ ) . pipe ( share ( ) ) ,
0 commit comments