2
2
module Cardano.Wallet.Kernel.Actions
3
3
( WalletAction (.. )
4
4
, WalletActionInterp (.. )
5
- , forkWalletWorker
6
- , walletWorker
5
+ , withWalletWorker
6
+ , WalletWorkerExpiredError ( .. )
7
7
, interp
8
8
, interpList
9
9
, WalletWorkerState
@@ -12,8 +12,9 @@ module Cardano.Wallet.Kernel.Actions
12
12
, isValidState
13
13
) where
14
14
15
- import Control.Concurrent.Async (async , link )
16
- import Control.Concurrent.Chan
15
+ import qualified Control.Concurrent.Async as Async
16
+ import qualified Control.Concurrent.STM as STM
17
+ import qualified Control.Exception.Safe as Ex
17
18
import Control.Lens (makeLenses , (%=) , (+=) , (-=) , (.=) )
18
19
import Formatting (bprint , build , shown , (%) )
19
20
import qualified Formatting.Buildable
@@ -33,7 +34,6 @@ data WalletAction b
33
34
= ApplyBlocks (OldestFirst NE b )
34
35
| RollbackBlocks (NewestFirst NE b )
35
36
| LogMessage Text
36
- | Shutdown
37
37
38
38
-- | Interface abstraction for the wallet worker.
39
39
-- The caller provides these primitive wallet operations;
@@ -118,23 +118,25 @@ interp walletInterp action = do
118
118
119
119
LogMessage txt -> emit txt
120
120
121
- Shutdown -> error " walletWorker: unreacheable dead code, reached!"
122
-
123
121
where
124
122
WalletActionInterp {.. } = lifted walletInterp
125
123
prependNewestFirst bs = \ nf -> NewestFirst (getNewestFirst bs <> getNewestFirst nf)
126
124
127
- -- | Connect a wallet action interpreter to a channel of actions.
128
- walletWorker :: forall b . Chan (WalletAction b ) -> WalletActionInterp IO b -> IO ()
129
- walletWorker chan ops = do
130
- emit ops " Starting wallet worker."
131
- void $ (`evalStateT` initialWorkerState) tick
132
- emit ops " Finishing wallet worker."
133
- where
134
- tick :: StateT (WalletWorkerState b ) IO ()
135
- tick = lift (readChan chan) >>= \ case
136
- Shutdown -> return ()
137
- msg -> interp ops msg >> tick
125
+ -- | Connect a wallet action interpreter to a source of actions. This function
126
+ -- returns as soon as the given action returns 'Nothing'.
127
+ walletWorker
128
+ :: Ex. MonadMask m
129
+ => WalletActionInterp m b
130
+ -> m (Maybe (WalletAction b ))
131
+ -> m ()
132
+ walletWorker wai getWA = Ex. bracket_
133
+ (emit wai " Starting wallet worker." )
134
+ (evalStateT
135
+ (fix $ \ next -> lift getWA >>= \ case
136
+ Nothing -> pure ()
137
+ Just wa -> interp wai wa >> next)
138
+ initialWorkerState)
139
+ (emit wai " Stopping wallet worker." )
138
140
139
141
-- | Connect a wallet action interpreter to a stream of actions.
140
142
interpList :: Monad m => WalletActionInterp m b -> [WalletAction b ] -> m (WalletWorkerState b )
@@ -147,13 +149,51 @@ initialWorkerState = WalletWorkerState
147
149
, _lengthPendingBlocks = 0
148
150
}
149
151
150
- -- | Start up a wallet worker; the worker will respond to actions issued over the
151
- -- returned channel.
152
- forkWalletWorker :: WalletActionInterp IO b -> IO (WalletAction b -> IO () )
153
- forkWalletWorker ops = do
154
- c <- newChan
155
- link =<< async (walletWorker c ops)
156
- return (writeChan c)
152
+ -- | Thrown by 'withWalletWorker''s continuation in case it's used outside of
153
+ -- its intended scope.
154
+ data WalletWorkerExpiredError = WalletWorkerExpiredError deriving (Show )
155
+ instance Ex. Exception WalletWorkerExpiredError
156
+
157
+ -- | Start a wallet worker in backround who will react to input provided via the
158
+ -- 'STM' function, in FIFO order.
159
+ --
160
+ -- After the given continuation returns (successfully or due to some exception),
161
+ -- the worker will continue processing any pending input before returning,
162
+ -- re-throwing the continuation's exception if any. Async exceptions from any
163
+ -- source will always be prioritized.
164
+ --
165
+ -- Usage of the obtained 'STM' action after the given continuation has returned
166
+ -- is not possible. It will throw 'WalletWorkerExpiredError'.
167
+ withWalletWorker
168
+ :: (MonadIO m , Ex. MonadMask m )
169
+ => WalletActionInterp IO a
170
+ -> ((WalletAction a -> STM () ) -> m b )
171
+ -> m b
172
+ withWalletWorker wai k = do
173
+ -- 'tmq' keeps items to be processed by the worker in FIFO order.
174
+ tmq :: TMQueue (WalletAction a ) <- liftIO newTMQueueIO
175
+ -- 'getWA' gets the next action to be processed.
176
+ let getWA :: STM (Maybe (WalletAction a ))
177
+ getWA = readTMQueue tmq
178
+ -- 'pushWA' adds an action to queue, unless it's been closed already.
179
+ let pushWA :: WalletAction a -> STM ()
180
+ pushWA = writeTMQueue tmq >=> \ case
181
+ True -> pure ()
182
+ False -> Ex. throwM WalletWorkerExpiredError
183
+ -- Run the worker in the background, ensuring that any exceptions from it
184
+ -- get thrown to the current thread.
185
+ Ex. bracket
186
+ (liftIO $ do
187
+ as1 <- Async. async (walletWorker wai (STM. atomically getWA))
188
+ Async. link as1
189
+ pure as1)
190
+ (\ as1 -> liftIO $ do
191
+ -- Prevent new input.
192
+ STM. atomically (closeTMQueue tmq)
193
+ -- Wait for the worker to finish, re-throwing any exceptions from it.
194
+ Async. wait as1)
195
+ (\ _ -> k pushWA)
196
+
157
197
158
198
-- | Check if this is the initial worker state.
159
199
isInitialState :: Eq b => WalletWorkerState b -> Bool
@@ -187,9 +227,59 @@ instance Show b => Buildable (WalletAction b) where
187
227
ApplyBlocks bs -> bprint (" ApplyBlocks " % shown) bs
188
228
RollbackBlocks bs -> bprint (" RollbackBlocks " % shown) bs
189
229
LogMessage bs -> bprint (" LogMessage " % shown) bs
190
- Shutdown -> bprint " Shutdown"
191
230
192
231
instance Show b => Buildable [WalletAction b ] where
193
232
build was = case was of
194
233
[] -> bprint " []"
195
234
(x: xs) -> bprint (build % " :" % build) x xs
235
+
236
+ --------------------------------------------------------------------------------
237
+ -- STM closeable queue.
238
+
239
+ -- | A FIFO queue that can be closed, preventing new input from being writen to
240
+ -- it.
241
+ --
242
+ -- This is similar to 'Control.Concurrent.STM.TMQueue', redefined here with some
243
+ -- of its API to avoid a dependency on the 'stm-chans' library.
244
+ data TMQueue a
245
+ = UnsafeTMQueue ! (STM. TVar TMQueueOpen ) ! (STM. TQueue a )
246
+ -- ^ Don't use this constructor directly. It's internal. It carries the queue
247
+ -- itself, and whether this 'TMQueue' is open or not.
248
+
249
+ data TMQueueOpen = TMQueueOpen | TMQueueNotOpen
250
+
251
+ -- | Creates a new empty and open 'TMQueue'.
252
+ newTMQueueIO :: IO (TMQueue a )
253
+ newTMQueueIO = UnsafeTMQueue <$> STM. newTVarIO TMQueueOpen <*> STM. newTQueueIO
254
+
255
+ -- | Closes the 'TMQueue'. After this, any elements already in the 'TMQueue'
256
+ -- will continue to be successfully returned by 'readTMQueue'. However, any
257
+ -- new writes with 'writeTMQueue' will fail as described by its documentation.
258
+ closeTMQueue :: TMQueue a -> STM ()
259
+ closeTMQueue (UnsafeTMQueue to _) = STM. writeTVar to TMQueueNotOpen
260
+
261
+ -- | Writes a new input to the 'TMQueue', in FIFO order.
262
+ --
263
+ -- It returns 'True' if the 'TMQueue' was open and it was possible to write to
264
+ -- it. Otherwise, if the 'TMQueue' was closed, it returns 'False', meaning
265
+ -- nothing has been writen to the queue.
266
+ writeTMQueue :: TMQueue a -> a -> STM Bool
267
+ writeTMQueue (UnsafeTMQueue to tq) a = do
268
+ STM. readTVar to >>= \ case
269
+ TMQueueOpen -> STM. writeTQueue tq a >> pure True
270
+ TMQueueNotOpen -> pure False
271
+
272
+ -- | Read a value from the 'TMQueue', in FIFO order.
273
+ --
274
+ -- If the 'TMQueue' is empty and closed, then this function returns 'Nothing'.
275
+ -- Otherwise, if the 'TMQueue' is not closed, this function will block waiting
276
+ -- for new input.
277
+ readTMQueue :: TMQueue a -> STM (Maybe a )
278
+ readTMQueue (UnsafeTMQueue to tq) = do
279
+ STM. tryReadTQueue tq >>= \ case
280
+ Just a -> pure (Just a)
281
+ Nothing -> STM. readTVar to >>= \ case
282
+ TMQueueOpen -> Just <$> STM. readTQueue tq
283
+ TMQueueNotOpen -> pure Nothing
284
+
285
+
0 commit comments