-
Notifications
You must be signed in to change notification settings - Fork 631
[CBR-243] improve wallet worker start-up and exception handling #3330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ | |
module Cardano.Wallet.Kernel.Actions | ||
( WalletAction(..) | ||
, WalletActionInterp(..) | ||
, forkWalletWorker | ||
, walletWorker | ||
, withWalletWorker | ||
, WalletWorkerExpiredError(..) | ||
, interp | ||
, interpList | ||
, WalletWorkerState | ||
|
@@ -12,8 +12,9 @@ module Cardano.Wallet.Kernel.Actions | |
, isValidState | ||
) where | ||
|
||
import Control.Concurrent.Async (async, link) | ||
import Control.Concurrent.Chan | ||
import qualified Control.Concurrent.Async as Async | ||
import qualified Control.Concurrent.STM as STM | ||
import qualified Control.Exception.Safe as Ex | ||
import Control.Lens (makeLenses, (%=), (+=), (-=), (.=)) | ||
import Formatting (bprint, build, shown, (%)) | ||
import qualified Formatting.Buildable | ||
|
@@ -33,7 +34,6 @@ data WalletAction b | |
= ApplyBlocks (OldestFirst NE b) | ||
| RollbackBlocks (NewestFirst NE b) | ||
| LogMessage Text | ||
| Shutdown | ||
|
||
-- | Interface abstraction for the wallet worker. | ||
-- The caller provides these primitive wallet operations; | ||
|
@@ -118,23 +118,25 @@ interp walletInterp action = do | |
|
||
LogMessage txt -> emit txt | ||
|
||
Shutdown -> error "walletWorker: unreacheable dead code, reached!" | ||
|
||
where | ||
WalletActionInterp{..} = lifted walletInterp | ||
prependNewestFirst bs = \nf -> NewestFirst (getNewestFirst bs <> getNewestFirst nf) | ||
|
||
-- | Connect a wallet action interpreter to a channel of actions. | ||
walletWorker :: forall b. Chan (WalletAction b) -> WalletActionInterp IO b -> IO () | ||
walletWorker chan ops = do | ||
emit ops "Starting wallet worker." | ||
void $ (`evalStateT` initialWorkerState) tick | ||
emit ops "Finishing wallet worker." | ||
where | ||
tick :: StateT (WalletWorkerState b) IO () | ||
tick = lift (readChan chan) >>= \case | ||
Shutdown -> return () | ||
msg -> interp ops msg >> tick | ||
-- | Connect a wallet action interpreter to a source of actions. This function | ||
-- returns as soon as the given action returns 'Nothing'. | ||
walletWorker | ||
:: Ex.MonadMask m | ||
=> WalletActionInterp m b | ||
-> m (Maybe (WalletAction b)) | ||
-> m () | ||
walletWorker wai getWA = Ex.bracket_ | ||
(emit wai "Starting wallet worker.") | ||
(evalStateT | ||
(fix $ \next -> lift getWA >>= \case | ||
Nothing -> pure () | ||
Just wa -> interp wai wa >> next) | ||
initialWorkerState) | ||
(emit wai "Stopping wallet worker.") | ||
|
||
-- | Connect a wallet action interpreter to a stream of actions. | ||
interpList :: Monad m => WalletActionInterp m b -> [WalletAction b] -> m (WalletWorkerState b) | ||
|
@@ -147,13 +149,51 @@ initialWorkerState = WalletWorkerState | |
, _lengthPendingBlocks = 0 | ||
} | ||
|
||
-- | Start up a wallet worker; the worker will respond to actions issued over the | ||
-- returned channel. | ||
forkWalletWorker :: WalletActionInterp IO b -> IO (WalletAction b -> IO ()) | ||
forkWalletWorker ops = do | ||
c <- newChan | ||
link =<< async (walletWorker c ops) | ||
return (writeChan c) | ||
-- | Thrown by 'withWalletWorker''s continuation in case it's used outside of | ||
-- its intended scope. | ||
data WalletWorkerExpiredError = WalletWorkerExpiredError deriving (Show) | ||
instance Ex.Exception WalletWorkerExpiredError | ||
|
||
-- | Start a wallet worker in backround who will react to input provided via the | ||
-- 'STM' function, in FIFO order. | ||
-- | ||
-- After the given continuation returns (successfully or due to some exception), | ||
-- the worker will continue processing any pending input before returning, | ||
-- re-throwing the continuation's exception if any. Async exceptions from any | ||
-- source will always be prioritized. | ||
-- | ||
-- Usage of the obtained 'STM' action after the given continuation has returned | ||
-- is not possible. It will throw 'WalletWorkerExpiredError'. | ||
withWalletWorker | ||
:: (MonadIO m, Ex.MonadMask m) | ||
=> WalletActionInterp IO a | ||
-> ((WalletAction a -> STM ()) -> m b) | ||
-> m b | ||
withWalletWorker wai k = do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite follow the need for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, the word “some” in “some The fact that then With this perspective in mind, I think the comments inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So why are we not using TChans? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before I chime in on whether or not some concurrency primitive is better than another one, I would like to understand which was the limitation of the previous implementation that Matt cooked up. Why is a single I think clarifying that would help us understanding the thought process behind your change here 😉 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @erikd what would be the argument for using |
||
-- 'tmq' keeps items to be processed by the worker in FIFO order. | ||
tmq :: TMQueue (WalletAction a) <- liftIO newTMQueueIO | ||
-- 'getWA' gets the next action to be processed. | ||
let getWA :: STM (Maybe (WalletAction a)) | ||
getWA = readTMQueue tmq | ||
-- 'pushWA' adds an action to queue, unless it's been closed already. | ||
let pushWA :: WalletAction a -> STM () | ||
pushWA = writeTMQueue tmq >=> \case | ||
True -> pure () | ||
False -> Ex.throwM WalletWorkerExpiredError | ||
-- Run the worker in the background, ensuring that any exceptions from it | ||
-- get thrown to the current thread. | ||
Ex.bracket | ||
(liftIO $ do | ||
as1 <- Async.async (walletWorker wai (STM.atomically getWA)) | ||
Async.link as1 | ||
pure as1) | ||
(\as1 -> liftIO $ do | ||
-- Prevent new input. | ||
STM.atomically (closeTMQueue tmq) | ||
-- Wait for the worker to finish, re-throwing any exceptions from it. | ||
Async.wait as1) | ||
(\_ -> k pushWA) | ||
|
||
|
||
-- | Check if this is the initial worker state. | ||
isInitialState :: Eq b => WalletWorkerState b -> Bool | ||
|
@@ -187,9 +227,59 @@ instance Show b => Buildable (WalletAction b) where | |
ApplyBlocks bs -> bprint ("ApplyBlocks " % shown) bs | ||
RollbackBlocks bs -> bprint ("RollbackBlocks " % shown) bs | ||
LogMessage bs -> bprint ("LogMessage " % shown) bs | ||
Shutdown -> bprint "Shutdown" | ||
|
||
instance Show b => Buildable [WalletAction b] where | ||
build was = case was of | ||
[] -> bprint "[]" | ||
(x:xs) -> bprint (build % ":" % build) x xs | ||
|
||
-------------------------------------------------------------------------------- | ||
-- STM closeable queue. | ||
|
||
-- | A FIFO queue that can be closed, preventing new input from being writen to | ||
-- it. | ||
-- | ||
-- This is similar to 'Control.Concurrent.STM.TMQueue', redefined here with some | ||
-- of its API to avoid a dependency on the 'stm-chans' library. | ||
data TMQueue a | ||
= UnsafeTMQueue !(STM.TVar TMQueueOpen) !(STM.TQueue a) | ||
-- ^ Don't use this constructor directly. It's internal. It carries the queue | ||
-- itself, and whether this 'TMQueue' is open or not. | ||
|
||
data TMQueueOpen = TMQueueOpen | TMQueueNotOpen | ||
|
||
-- | Creates a new empty and open 'TMQueue'. | ||
newTMQueueIO :: IO (TMQueue a) | ||
newTMQueueIO = UnsafeTMQueue <$> STM.newTVarIO TMQueueOpen <*> STM.newTQueueIO | ||
|
||
-- | Closes the 'TMQueue'. After this, any elements already in the 'TMQueue' | ||
-- will continue to be successfully returned by 'readTMQueue'. However, any | ||
-- new writes with 'writeTMQueue' will fail as described by its documentation. | ||
closeTMQueue :: TMQueue a -> STM () | ||
closeTMQueue (UnsafeTMQueue to _) = STM.writeTVar to TMQueueNotOpen | ||
|
||
-- | Writes a new input to the 'TMQueue', in FIFO order. | ||
-- | ||
-- It returns 'True' if the 'TMQueue' was open and it was possible to write to | ||
-- it. Otherwise, if the 'TMQueue' was closed, it returns 'False', meaning | ||
-- nothing has been writen to the queue. | ||
writeTMQueue :: TMQueue a -> a -> STM Bool | ||
writeTMQueue (UnsafeTMQueue to tq) a = do | ||
STM.readTVar to >>= \case | ||
TMQueueOpen -> STM.writeTQueue tq a >> pure True | ||
TMQueueNotOpen -> pure False | ||
|
||
-- | Read a value from the 'TMQueue', in FIFO order. | ||
-- | ||
-- If the 'TMQueue' is empty and closed, then this function returns 'Nothing'. | ||
-- Otherwise, if the 'TMQueue' is not closed, this function will block waiting | ||
-- for new input. | ||
readTMQueue :: TMQueue a -> STM (Maybe a) | ||
readTMQueue (UnsafeTMQueue to tq) = do | ||
STM.tryReadTQueue tq >>= \case | ||
Just a -> pure (Just a) | ||
Nothing -> STM.readTVar to >>= \case | ||
TMQueueOpen -> Just <$> STM.readTQueue tq | ||
TMQueueNotOpen -> pure Nothing | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the use of
fix
here? If there is some subtle memory efficiency reason, I think it would be good to spell it out.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, no, there's no memory efficiency reason involved. Just normal recursive monadic code.