|
| 1 | +package db |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "database/sql" |
| 6 | + "math" |
| 7 | + prand "math/rand" |
| 8 | + "time" |
| 9 | + |
| 10 | + "github.com/lightninglabs/lightning-terminal/db/sqlc" |
| 11 | +) |
| 12 | + |
| 13 | +var ( |
| 14 | + // DefaultStoreTimeout is the default timeout used for any interaction |
| 15 | + // with the storage/database. |
| 16 | + DefaultStoreTimeout = time.Second * 10 |
| 17 | +) |
| 18 | + |
| 19 | +const ( |
| 20 | + // DefaultNumTxRetries is the default number of times we'll retry a |
| 21 | + // transaction if it fails with an error that permits transaction |
| 22 | + // repetition. |
| 23 | + DefaultNumTxRetries = 10 |
| 24 | + |
| 25 | + // DefaultInitialRetryDelay is the default initial delay between |
| 26 | + // retries. This will be used to generate a random delay between -50% |
| 27 | + // and +50% of this value, so 20 to 60 milliseconds. The retry will be |
| 28 | + // doubled after each attempt until we reach DefaultMaxRetryDelay. We |
| 29 | + // start with a random value to avoid multiple goroutines that are |
| 30 | + // created at the same time to effectively retry at the same time. |
| 31 | + DefaultInitialRetryDelay = time.Millisecond * 40 |
| 32 | + |
| 33 | + // DefaultMaxRetryDelay is the default maximum delay between retries. |
| 34 | + DefaultMaxRetryDelay = time.Second * 3 |
| 35 | +) |
| 36 | + |
| 37 | +// TxOptions represents a set of options one can use to control what type of |
| 38 | +// database transaction is created. Transaction can wither be read or write. |
| 39 | +type TxOptions interface { |
| 40 | + // ReadOnly returns true if the transaction should be read only. |
| 41 | + ReadOnly() bool |
| 42 | +} |
| 43 | + |
| 44 | +// BatchedTx is a generic interface that represents the ability to execute |
| 45 | +// several operations to a given storage interface in a single atomic |
| 46 | +// transaction. Typically, Q here will be some subset of the main sqlc.Querier |
| 47 | +// interface allowing it to only depend on the routines it needs to implement |
| 48 | +// any additional business logic. |
| 49 | +type BatchedTx[Q any] interface { |
| 50 | + // ExecTx will execute the passed txBody, operating upon generic |
| 51 | + // parameter Q (usually a storage interface) in a single transaction. |
| 52 | + // The set of TxOptions are passed in in order to allow the caller to |
| 53 | + // specify if a transaction should be read-only and optionally what |
| 54 | + // type of concurrency control should be used. |
| 55 | + ExecTx(ctx context.Context, txOptions TxOptions, |
| 56 | + txBody func(Q) error) error |
| 57 | + |
| 58 | + // Backend returns the type of the database backend used. |
| 59 | + Backend() sqlc.BackendType |
| 60 | +} |
| 61 | + |
| 62 | +// Tx represents a database transaction that can be committed or rolled back. |
| 63 | +type Tx interface { |
| 64 | + // Commit commits the database transaction, an error should be returned |
| 65 | + // if the commit isn't possible. |
| 66 | + Commit() error |
| 67 | + |
| 68 | + // Rollback rolls back an incomplete database transaction. |
| 69 | + // Transactions that were able to be committed can still call this as a |
| 70 | + // noop. |
| 71 | + Rollback() error |
| 72 | +} |
| 73 | + |
| 74 | +// QueryCreator is a generic function that's used to create a Querier, which is |
| 75 | +// a type of interface that implements storage related methods from a database |
| 76 | +// transaction. This will be used to instantiate an object callers can use to |
| 77 | +// apply multiple modifications to an object interface in a single atomic |
| 78 | +// transaction. |
| 79 | +type QueryCreator[Q any] func(*sql.Tx) Q |
| 80 | + |
| 81 | +// BatchedQuerier is a generic interface that allows callers to create a new |
| 82 | +// database transaction based on an abstract type that implements the TxOptions |
| 83 | +// interface. |
| 84 | +type BatchedQuerier interface { |
| 85 | + // Querier is the underlying query source, this is in place so we can |
| 86 | + // pass a BatchedQuerier implementation directly into objects that |
| 87 | + // create a batched version of the normal methods they need. |
| 88 | + sqlc.Querier |
| 89 | + |
| 90 | + // BeginTx creates a new database transaction given the set of |
| 91 | + // transaction options. |
| 92 | + BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) |
| 93 | + |
| 94 | + // Backend returns the type of the database backend used. |
| 95 | + Backend() sqlc.BackendType |
| 96 | +} |
| 97 | + |
| 98 | +// txExecutorOptions is a struct that holds the options for the transaction |
| 99 | +// executor. This can be used to do things like retry a transaction due to an |
| 100 | +// error a certain amount of times. |
| 101 | +type txExecutorOptions struct { |
| 102 | + numRetries int |
| 103 | + initialRetryDelay time.Duration |
| 104 | + maxRetryDelay time.Duration |
| 105 | +} |
| 106 | + |
| 107 | +// defaultTxExecutorOptions returns the default options for the transaction |
| 108 | +// executor. |
| 109 | +func defaultTxExecutorOptions() *txExecutorOptions { |
| 110 | + return &txExecutorOptions{ |
| 111 | + numRetries: DefaultNumTxRetries, |
| 112 | + initialRetryDelay: DefaultInitialRetryDelay, |
| 113 | + maxRetryDelay: DefaultMaxRetryDelay, |
| 114 | + } |
| 115 | +} |
| 116 | + |
| 117 | +// randRetryDelay returns a random retry delay between -50% and +50% |
| 118 | +// of the configured delay that is doubled for each attempt and capped at a max |
| 119 | +// value. |
| 120 | +func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration { |
| 121 | + halfDelay := t.initialRetryDelay / 2 |
| 122 | + randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec |
| 123 | + |
| 124 | + // 50% plus 0%-100% gives us the range of 50%-150%. |
| 125 | + initialDelay := halfDelay + time.Duration(randDelay) |
| 126 | + |
| 127 | + // If this is the first attempt, we just return the initial delay. |
| 128 | + if attempt == 0 { |
| 129 | + return initialDelay |
| 130 | + } |
| 131 | + |
| 132 | + // For each subsequent delay, we double the initial delay. This still |
| 133 | + // gives us a somewhat random delay, but it still increases with each |
| 134 | + // attempt. If we double something n times, that's the same as |
| 135 | + // multiplying the value with 2^n. We limit the power to 32 to avoid |
| 136 | + // overflows. |
| 137 | + factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32))) |
| 138 | + actualDelay := initialDelay * factor |
| 139 | + |
| 140 | + // Cap the delay at the maximum configured value. |
| 141 | + if actualDelay > t.maxRetryDelay { |
| 142 | + return t.maxRetryDelay |
| 143 | + } |
| 144 | + |
| 145 | + return actualDelay |
| 146 | +} |
| 147 | + |
| 148 | +// TxExecutorOption is a functional option that allows us to pass in optional |
| 149 | +// argument when creating the executor. |
| 150 | +type TxExecutorOption func(*txExecutorOptions) |
| 151 | + |
| 152 | +// WithTxRetries is a functional option that allows us to specify the number of |
| 153 | +// times a transaction should be retried if it fails with a repeatable error. |
| 154 | +func WithTxRetries(numRetries int) TxExecutorOption { |
| 155 | + return func(o *txExecutorOptions) { |
| 156 | + o.numRetries = numRetries |
| 157 | + } |
| 158 | +} |
| 159 | + |
| 160 | +// WithTxRetryDelay is a functional option that allows us to specify the delay |
| 161 | +// to wait before a transaction is retried. |
| 162 | +func WithTxRetryDelay(delay time.Duration) TxExecutorOption { |
| 163 | + return func(o *txExecutorOptions) { |
| 164 | + o.initialRetryDelay = delay |
| 165 | + } |
| 166 | +} |
| 167 | + |
| 168 | +// TransactionExecutor is a generic struct that abstracts away from the type of |
| 169 | +// query a type needs to run under a database transaction, and also the set of |
| 170 | +// options for that transaction. The QueryCreator is used to create a query |
| 171 | +// given a database transaction created by the BatchedQuerier. |
| 172 | +type TransactionExecutor[Query any] struct { |
| 173 | + BatchedQuerier |
| 174 | + |
| 175 | + createQuery QueryCreator[Query] |
| 176 | + |
| 177 | + opts *txExecutorOptions |
| 178 | +} |
| 179 | + |
| 180 | +// NewTransactionExecutor creates a new instance of a TransactionExecutor given |
| 181 | +// a Querier query object and a concrete type for the type of transactions the |
| 182 | +// Querier understands. |
| 183 | +func NewTransactionExecutor[Querier any](db BatchedQuerier, |
| 184 | + createQuery QueryCreator[Querier], |
| 185 | + opts ...TxExecutorOption) *TransactionExecutor[Querier] { |
| 186 | + |
| 187 | + txOpts := defaultTxExecutorOptions() |
| 188 | + for _, optFunc := range opts { |
| 189 | + optFunc(txOpts) |
| 190 | + } |
| 191 | + |
| 192 | + return &TransactionExecutor[Querier]{ |
| 193 | + BatchedQuerier: db, |
| 194 | + createQuery: createQuery, |
| 195 | + opts: txOpts, |
| 196 | + } |
| 197 | +} |
| 198 | + |
| 199 | +// ExecTx is a wrapper for txBody to abstract the creation and commit of a db |
| 200 | +// transaction. The db transaction is embedded in a `*Queries` that txBody |
| 201 | +// needs to use when executing each one of the queries that need to be applied |
| 202 | +// atomically. This can be used by other storage interfaces to parameterize the |
| 203 | +// type of query and options run, in order to have access to batched operations |
| 204 | +// related to a storage object. |
| 205 | +func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, |
| 206 | + txOptions TxOptions, txBody func(Q) error) error { |
| 207 | + |
| 208 | + waitBeforeRetry := func(attemptNumber int) { |
| 209 | + retryDelay := t.opts.randRetryDelay(attemptNumber) |
| 210 | + |
| 211 | + log.Tracef("Retrying transaction due to tx serialization or "+ |
| 212 | + "deadlock error, attempt_number=%v, delay=%v", |
| 213 | + attemptNumber, retryDelay) |
| 214 | + |
| 215 | + // Before we try again, we'll wait with a random backoff based |
| 216 | + // on the retry delay. |
| 217 | + time.Sleep(retryDelay) |
| 218 | + } |
| 219 | + |
| 220 | + for i := 0; i < t.opts.numRetries; i++ { |
| 221 | + // Create the db transaction. |
| 222 | + tx, err := t.BatchedQuerier.BeginTx(ctx, txOptions) |
| 223 | + if err != nil { |
| 224 | + dbErr := MapSQLError(err) |
| 225 | + if IsSerializationOrDeadlockError(dbErr) { |
| 226 | + // Nothing to roll back here, since we didn't |
| 227 | + // even get a transaction yet. |
| 228 | + waitBeforeRetry(i) |
| 229 | + continue |
| 230 | + } |
| 231 | + |
| 232 | + return dbErr |
| 233 | + } |
| 234 | + |
| 235 | + // Rollback is safe to call even if the tx is already closed, |
| 236 | + // so if the tx commits successfully, this is a no-op. |
| 237 | + defer func() { |
| 238 | + _ = tx.Rollback() |
| 239 | + }() |
| 240 | + |
| 241 | + if err := txBody(t.createQuery(tx)); err != nil { |
| 242 | + dbErr := MapSQLError(err) |
| 243 | + if IsSerializationOrDeadlockError(dbErr) { |
| 244 | + // Roll back the transaction, then pop back up |
| 245 | + // to try once again. |
| 246 | + _ = tx.Rollback() |
| 247 | + |
| 248 | + waitBeforeRetry(i) |
| 249 | + continue |
| 250 | + } |
| 251 | + |
| 252 | + return dbErr |
| 253 | + } |
| 254 | + |
| 255 | + // Commit transaction. |
| 256 | + if err = tx.Commit(); err != nil { |
| 257 | + dbErr := MapSQLError(err) |
| 258 | + if IsSerializationOrDeadlockError(dbErr) { |
| 259 | + // Roll back the transaction, then pop back up |
| 260 | + // to try once again. |
| 261 | + _ = tx.Rollback() |
| 262 | + |
| 263 | + waitBeforeRetry(i) |
| 264 | + continue |
| 265 | + } |
| 266 | + |
| 267 | + return dbErr |
| 268 | + } |
| 269 | + |
| 270 | + return nil |
| 271 | + } |
| 272 | + |
| 273 | + // If we get to this point, then we weren't able to successfully commit |
| 274 | + // a tx given the max number of retries. |
| 275 | + return ErrRetriesExceeded |
| 276 | +} |
| 277 | + |
| 278 | +// Backend returns the type of the database backend used. |
| 279 | +func (t *TransactionExecutor[Q]) Backend() sqlc.BackendType { |
| 280 | + return t.BatchedQuerier.Backend() |
| 281 | +} |
| 282 | + |
| 283 | +// BaseDB is the base database struct that each implementation can embed to |
| 284 | +// gain some common functionality. |
| 285 | +type BaseDB struct { |
| 286 | + *sql.DB |
| 287 | + |
| 288 | + *sqlc.Queries |
| 289 | +} |
| 290 | + |
| 291 | +// BeginTx wraps the normal sql specific BeginTx method with the TxOptions |
| 292 | +// interface. This interface is then mapped to the concrete sql tx options |
| 293 | +// struct. |
| 294 | +func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) { |
| 295 | + sqlOptions := sql.TxOptions{ |
| 296 | + ReadOnly: opts.ReadOnly(), |
| 297 | + Isolation: sql.LevelSerializable, |
| 298 | + } |
| 299 | + return s.DB.BeginTx(ctx, &sqlOptions) |
| 300 | +} |
| 301 | + |
| 302 | +// Backend returns the type of the database backend used. |
| 303 | +func (s *BaseDB) Backend() sqlc.BackendType { |
| 304 | + return s.Queries.Backend() |
| 305 | +} |
| 306 | + |
| 307 | +// QueriesTxOptions defines the set of db txn options the SQLQueries |
| 308 | +// understands. |
| 309 | +type QueriesTxOptions struct { |
| 310 | + // readOnly governs if a read only transaction is needed or not. |
| 311 | + readOnly bool |
| 312 | +} |
| 313 | + |
| 314 | +// ReadOnly returns true if the transaction should be read only. |
| 315 | +// |
| 316 | +// NOTE: This implements the TxOptions. |
| 317 | +func (a *QueriesTxOptions) ReadOnly() bool { |
| 318 | + return a.readOnly |
| 319 | +} |
| 320 | + |
| 321 | +// NewQueryReadTx creates a new read transaction option set. |
| 322 | +func NewQueryReadTx() QueriesTxOptions { |
| 323 | + return QueriesTxOptions{ |
| 324 | + readOnly: true, |
| 325 | + } |
| 326 | +} |
0 commit comments