2
2
module Cardano.Wallet.Kernel.Actions
3
3
( WalletAction (.. )
4
4
, WalletActionInterp (.. )
5
- , forkWalletWorker
6
- , walletWorker
5
+ , withWalletWorker
6
+ , Err_WalletWorkerExpired ( .. )
7
7
, interp
8
8
, interpList
9
9
, WalletWorkerState
@@ -12,8 +12,10 @@ module Cardano.Wallet.Kernel.Actions
12
12
, isValidState
13
13
) where
14
14
15
- import Control.Concurrent.Async (async , link )
16
- import Control.Concurrent.Chan
15
+ import qualified Control.Concurrent.Async as Async
16
+ import qualified Control.Concurrent.STM as STM
17
+ import qualified Control.Exception.Safe as Ex
18
+ import Control.Monad.Morph (MFunctor (hoist ))
17
19
import Control.Lens (makeLenses , (%=) , (+=) , (-=) , (.=) )
18
20
import Formatting (bprint , build , shown , (%) )
19
21
import qualified Formatting.Buildable
@@ -55,13 +57,11 @@ data WalletWorkerState b = WalletWorkerState
55
57
56
58
makeLenses ''WalletWorkerState
57
59
58
- -- A helper function for lifting a `WalletActionInterp` through a monad transformer.
59
- lifted :: (Monad m , MonadTrans t ) => WalletActionInterp m b -> WalletActionInterp (t m ) b
60
- lifted i = WalletActionInterp
61
- { applyBlocks = lift . applyBlocks i
62
- , switchToFork = \ n bs -> lift (switchToFork i n bs)
63
- , emit = lift . emit i
64
- }
60
+ instance MFunctor WalletActionInterp where
61
+ hoist nat i = WalletActionInterp
62
+ { applyBlocks = fmap nat (applyBlocks i)
63
+ , switchToFork = fmap (fmap nat) (switchToFork i)
64
+ , emit = fmap nat (emit i) }
65
65
66
66
-- | `interp` is the main interpreter for converting a wallet action to a concrete
67
67
-- transition on the wallet worker's state, perhaps combined with some effects on
@@ -121,20 +121,24 @@ interp walletInterp action = do
121
121
Shutdown -> error " walletWorker: unreacheable dead code, reached!"
122
122
123
123
where
124
- WalletActionInterp {.. } = lifted walletInterp
124
+ WalletActionInterp {.. } = hoist lift walletInterp
125
125
prependNewestFirst bs = \ nf -> NewestFirst (getNewestFirst bs <> getNewestFirst nf)
126
126
127
- -- | Connect a wallet action interpreter to a channel of actions.
128
- walletWorker :: forall b . Chan (WalletAction b ) -> WalletActionInterp IO b -> IO ()
129
- walletWorker chan ops = do
130
- emit ops " Starting wallet worker."
131
- void $ (`evalStateT` initialWorkerState) tick
132
- emit ops " Finishing wallet worker."
133
- where
134
- tick :: StateT (WalletWorkerState b ) IO ()
135
- tick = lift (readChan chan) >>= \ case
136
- Shutdown -> return ()
137
- msg -> interp ops msg >> tick
127
+ -- | Connect a wallet action interpreter to a source actions. This function
128
+ -- returns as soon as the given action returns 'Shutdown'.
129
+ walletWorker
130
+ :: Ex. MonadMask m
131
+ => WalletActionInterp m b
132
+ -> m (WalletAction b )
133
+ -> m ()
134
+ walletWorker wai getWA = Ex. bracket_
135
+ (emit wai " Starting wallet worker." )
136
+ (evalStateT
137
+ (fix $ \ next -> lift getWA >>= \ case
138
+ Shutdown -> pure ()
139
+ wa -> interp wai wa >> next)
140
+ initialWorkerState)
141
+ (emit wai " Stoping wallet worker." )
138
142
139
143
-- | Connect a wallet action interpreter to a stream of actions.
140
144
interpList :: Monad m => WalletActionInterp m b -> [WalletAction b ] -> m (WalletWorkerState b )
@@ -147,13 +151,51 @@ initialWorkerState = WalletWorkerState
147
151
, _lengthPendingBlocks = 0
148
152
}
149
153
150
- -- | Start up a wallet worker; the worker will respond to actions issued over the
151
- -- returned channel.
152
- forkWalletWorker :: WalletActionInterp IO b -> IO (WalletAction b -> IO () )
153
- forkWalletWorker ops = do
154
- c <- newChan
155
- link =<< async (walletWorker c ops)
156
- return (writeChan c)
154
+ -- | Thrown by 'withWalletWorker''s continuation in case it's used outside of
155
+ -- its intended scope.
156
+ data Err_WalletWorkerExpired = Err_WalletWorkerExpired deriving (Show )
157
+ instance Ex. Exception Err_WalletWorkerExpired
158
+
159
+ -- | Start a wallet worker in backround who will react to input provided via the
160
+ -- 'STM' function, in FIFO order.
161
+ --
162
+ -- After the given continuation returns (successfully or due to some exception),
163
+ -- the worker will continue processing any pending input before returning,
164
+ -- re-throwing the continuation's exception if any. Async exceptions from any
165
+ -- source will always be prioritized.
166
+ --
167
+ -- Usage of the obtained 'STM' action after the given continuation has returned
168
+ -- is not possible. It will throw 'Err_WalletWorkerExpired'.
169
+ withWalletWorker
170
+ :: (MonadIO m , Ex. MonadMask m )
171
+ => WalletActionInterp IO a
172
+ -> ((WalletAction a -> STM () ) -> m b )
173
+ -> m b
174
+ withWalletWorker wai k = do
175
+ -- 'tqWA' keeps items to be processed by the worker in FIFO order.
176
+ tqWA :: STM. TQueue (WalletAction a ) <- liftIO STM. newTQueueIO
177
+ -- 'tvOpen' is 'True' as long as 'tqWA' can receive new input.
178
+ tvOpen :: STM. TVar Bool <- liftIO (STM. newTVarIO True )
179
+ -- 'getWA' returns the next action to be processed. This function retries
180
+ -- unless 'tvOpen' is 'False', in which case 'Shutdown' is returned.
181
+ let getWA :: STM (WalletAction a )
182
+ getWA = STM. tryReadTQueue tqWA >>= \ case
183
+ Just wa -> pure wa
184
+ Nothing -> STM. readTVar tvOpen >>= \ case
185
+ False -> pure Shutdown
186
+ True -> STM. retry
187
+ Ex. bracket
188
+ (liftIO (Async. async (walletWorker wai (STM. atomically getWA))))
189
+ (\ a1 -> liftIO $ do
190
+ -- Prevent new input.
191
+ STM. atomically (STM. writeTVar tvOpen False )
192
+ -- Wait for the worker to finish, re-throwing any exceptions from it.
193
+ Async. wait a1)
194
+ (\ _ -> k $ \ wa -> do
195
+ -- Add a WalletAction to the queue, unless it's been closed already.
196
+ STM. readTVar tvOpen >>= \ case
197
+ True -> STM. writeTQueue tqWA wa
198
+ False -> Ex. throwM Err_WalletWorkerExpired )
157
199
158
200
-- | Check if this is the initial worker state.
159
201
isInitialState :: Eq b => WalletWorkerState b -> Bool
0 commit comments