@@ -3,6 +3,7 @@ module Cardano.Wallet.Kernel.Actions
3
3
( WalletAction (.. )
4
4
, WalletActionInterp (.. )
5
5
, withWalletWorker
6
+ , Err_WalletWorkerExpired (.. )
6
7
, interp
7
8
, interpList
8
9
, WalletWorkerState
@@ -11,7 +12,7 @@ module Cardano.Wallet.Kernel.Actions
11
12
, isValidState
12
13
) where
13
14
14
- import Control.Concurrent ( forkFinally )
15
+ import qualified Control.Concurrent.Async as Async
15
16
import qualified Control.Concurrent.STM as STM
16
17
import qualified Control.Exception.Safe as Ex
17
18
import Control.Monad.Morph (MFunctor (hoist ))
@@ -150,51 +151,51 @@ initialWorkerState = WalletWorkerState
150
151
, _lengthPendingBlocks = 0
151
152
}
152
153
154
+ -- | Thrown by 'withWalletWorker''s continuation in case it's used outside of
155
+ -- its intended scope.
156
+ data Err_WalletWorkerExpired = Err_WalletWorkerExpired deriving (Show )
157
+ instance Ex. Exception Err_WalletWorkerExpired
153
158
154
159
-- | Start a wallet worker in backround who will react to input provided via the
155
160
-- 'STM' function, in FIFO order.
156
161
--
157
162
-- After the given continuation returns (successfully or due to some exception),
158
163
-- the worker will continue processing any pending input before returning,
159
- -- re-throwing the continuation's exception if any.
164
+ -- re-throwing the continuation's exception if any. Async exceptions from any
165
+ -- source will always be prioritized.
160
166
--
161
167
-- Usage of the obtained 'STM' action after the given continuation has returned
162
- -- will fail with an exception .
168
+ -- is not possible. It will throw 'Err_WalletWorkerExpired' .
163
169
withWalletWorker
164
170
:: (MonadIO m , Ex. MonadMask m )
165
171
=> WalletActionInterp IO a
166
172
-> ((WalletAction a -> STM () ) -> m b )
167
173
-> m b
168
174
withWalletWorker wai k = do
169
- -- 'mDone' is full if the worker finished.
170
- mDone :: MVar (Either Ex. SomeException () ) <- liftIO newEmptyMVar
171
- -- 'tqWA' keeps items to be processed by the worker.
175
+ -- 'tqWA' keeps items to be processed by the worker in FIFO order.
172
176
tqWA :: STM. TQueue (WalletAction a ) <- liftIO STM. newTQueueIO
173
177
-- 'tvOpen' is 'True' as long as 'tqWA' can receive new input.
174
178
tvOpen :: STM. TVar Bool <- liftIO (STM. newTVarIO True )
175
- -- 'getWA' returns the next action to be processed. This function blocks
179
+ -- 'getWA' returns the next action to be processed. This function retries
176
180
-- unless 'tvOpen' is 'False', in which case 'Shutdown' is returned.
177
181
let getWA :: STM (WalletAction a )
178
182
getWA = STM. tryReadTQueue tqWA >>= \ case
179
183
Just wa -> pure wa
180
184
Nothing -> STM. readTVar tvOpen >>= \ case
181
185
False -> pure Shutdown
182
186
True -> STM. retry
183
- -- 'pushWA' adds an action to be executed by the worker, in FIFO order. It
184
- -- will throw 'BlockedIndefinitelyOnSTM' if used after `k` returns.
185
- let pushWA :: WalletAction a -> STM ()
186
- pushWA = \ wa -> do STM. check =<< STM. readTVar tvOpen
187
- STM. writeTQueue tqWA wa
188
- liftIO $ void $ forkFinally
189
- (walletWorker wai (STM. atomically getWA))
190
- (putMVar mDone)
191
- Ex. finally
192
- (k pushWA)
193
- (liftIO $ do
194
- -- Prevent new input.
195
- STM. atomically (STM. writeTVar tvOpen False )
196
- -- Wait for the worker to finish.
197
- either Ex. throwM pure =<< takeMVar mDone)
187
+ Ex. bracket
188
+ (liftIO (Async. async (walletWorker wai (STM. atomically getWA))))
189
+ (\ a1 -> liftIO $ do
190
+ -- Prevent new input.
191
+ STM. atomically (STM. writeTVar tvOpen False )
192
+ -- Wait for the worker to finish, re-throwing any exceptions from it.
193
+ Async. wait a1)
194
+ (\ _ -> k $ \ wa -> do
195
+ -- Add a WalletAction to the queue, unless it's been closed already.
196
+ STM. readTVar tvOpen >>= \ case
197
+ True -> STM. writeTQueue tqWA wa
198
+ False -> Ex. throwM Err_WalletWorkerExpired )
198
199
199
200
-- | Check if this is the initial worker state.
200
201
isInitialState :: Eq b => WalletWorkerState b -> Bool
0 commit comments