From b149815d880fbca0caa0e2393439d90f41eea60c Mon Sep 17 00:00:00 2001 From: Renzo Carbonara Date: Tue, 31 Jul 2018 16:15:58 +0300 Subject: [PATCH] [CBR-243] improve wallet worker start-up and exception handling --- pkgs/default.nix | 1 + wallet-new/cardano-sl-wallet-new.cabal | 1 + .../src/Cardano/Wallet/Kernel/Actions.hs | 142 ++++++++++++++---- .../src/Cardano/Wallet/WalletLayer/Kernel.hs | 51 +++---- .../test/unit/Test/Spec/WalletWorker.hs | 1 - 5 files changed, 135 insertions(+), 61 deletions(-) diff --git a/pkgs/default.nix b/pkgs/default.nix index 60bd18c0a64..b06091960bd 100644 --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -17886,6 +17886,7 @@ servant-swagger-ui-core servant-swagger-ui-redoc sqlite-simple sqlite-simple-errors +stm swagger2 text time diff --git a/wallet-new/cardano-sl-wallet-new.cabal b/wallet-new/cardano-sl-wallet-new.cabal index 7333db2df9d..318b6f903b2 100755 --- a/wallet-new/cardano-sl-wallet-new.cabal +++ b/wallet-new/cardano-sl-wallet-new.cabal @@ -203,6 +203,7 @@ library , servant-swagger-ui-redoc , sqlite-simple , sqlite-simple-errors + , stm , swagger2 , text , time diff --git a/wallet-new/src/Cardano/Wallet/Kernel/Actions.hs b/wallet-new/src/Cardano/Wallet/Kernel/Actions.hs index d4a285b0ef7..5121a440cdc 100644 --- a/wallet-new/src/Cardano/Wallet/Kernel/Actions.hs +++ b/wallet-new/src/Cardano/Wallet/Kernel/Actions.hs @@ -2,8 +2,8 @@ module Cardano.Wallet.Kernel.Actions ( WalletAction(..) , WalletActionInterp(..) - , forkWalletWorker - , walletWorker + , withWalletWorker + , WalletWorkerExpiredError(..) , interp , interpList , WalletWorkerState @@ -12,8 +12,9 @@ module Cardano.Wallet.Kernel.Actions , isValidState ) where -import Control.Concurrent.Async (async, link) -import Control.Concurrent.Chan +import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.STM as STM +import qualified Control.Exception.Safe as Ex import Control.Lens (makeLenses, (%=), (+=), (-=), (.=)) import Formatting (bprint, build, shown, (%)) import qualified Formatting.Buildable @@ -33,7 +34,6 @@ data WalletAction b = ApplyBlocks (OldestFirst NE b) | RollbackBlocks (NewestFirst NE b) | LogMessage Text - | Shutdown -- | Interface abstraction for the wallet worker. -- The caller provides these primitive wallet operations; @@ -118,23 +118,25 @@ interp walletInterp action = do LogMessage txt -> emit txt - Shutdown -> error "walletWorker: unreacheable dead code, reached!" - where WalletActionInterp{..} = lifted walletInterp prependNewestFirst bs = \nf -> NewestFirst (getNewestFirst bs <> getNewestFirst nf) --- | Connect a wallet action interpreter to a channel of actions. -walletWorker :: forall b. Chan (WalletAction b) -> WalletActionInterp IO b -> IO () -walletWorker chan ops = do - emit ops "Starting wallet worker." - void $ (`evalStateT` initialWorkerState) tick - emit ops "Finishing wallet worker." - where - tick :: StateT (WalletWorkerState b) IO () - tick = lift (readChan chan) >>= \case - Shutdown -> return () - msg -> interp ops msg >> tick +-- | Connect a wallet action interpreter to a source of actions. This function +-- returns as soon as the given action returns 'Nothing'. +walletWorker + :: Ex.MonadMask m + => WalletActionInterp m b + -> m (Maybe (WalletAction b)) + -> m () +walletWorker wai getWA = Ex.bracket_ + (emit wai "Starting wallet worker.") + (evalStateT + (fix $ \next -> lift getWA >>= \case + Nothing -> pure () + Just wa -> interp wai wa >> next) + initialWorkerState) + (emit wai "Stopping wallet worker.") -- | Connect a wallet action interpreter to a stream of actions. interpList :: Monad m => WalletActionInterp m b -> [WalletAction b] -> m (WalletWorkerState b) @@ -147,13 +149,51 @@ initialWorkerState = WalletWorkerState , _lengthPendingBlocks = 0 } --- | Start up a wallet worker; the worker will respond to actions issued over the --- returned channel. -forkWalletWorker :: WalletActionInterp IO b -> IO (WalletAction b -> IO ()) -forkWalletWorker ops = do - c <- newChan - link =<< async (walletWorker c ops) - return (writeChan c) +-- | Thrown by 'withWalletWorker''s continuation in case it's used outside of +-- its intended scope. +data WalletWorkerExpiredError = WalletWorkerExpiredError deriving (Show) +instance Ex.Exception WalletWorkerExpiredError + +-- | Start a wallet worker in backround who will react to input provided via the +-- 'STM' function, in FIFO order. +-- +-- After the given continuation returns (successfully or due to some exception), +-- the worker will continue processing any pending input before returning, +-- re-throwing the continuation's exception if any. Async exceptions from any +-- source will always be prioritized. +-- +-- Usage of the obtained 'STM' action after the given continuation has returned +-- is not possible. It will throw 'WalletWorkerExpiredError'. +withWalletWorker + :: (MonadIO m, Ex.MonadMask m) + => WalletActionInterp IO a + -> ((WalletAction a -> STM ()) -> m b) + -> m b +withWalletWorker wai k = do + -- 'tmq' keeps items to be processed by the worker in FIFO order. + tmq :: TMQueue (WalletAction a) <- liftIO newTMQueueIO + -- 'getWA' gets the next action to be processed. + let getWA :: STM (Maybe (WalletAction a)) + getWA = readTMQueue tmq + -- 'pushWA' adds an action to queue, unless it's been closed already. + let pushWA :: WalletAction a -> STM () + pushWA = writeTMQueue tmq >=> \case + True -> pure () + False -> Ex.throwM WalletWorkerExpiredError + -- Run the worker in the background, ensuring that any exceptions from it + -- get thrown to the current thread. + Ex.bracket + (liftIO $ do + as1 <- Async.async (walletWorker wai (STM.atomically getWA)) + Async.link as1 + pure as1) + (\as1 -> liftIO $ do + -- Prevent new input. + STM.atomically (closeTMQueue tmq) + -- Wait for the worker to finish, re-throwing any exceptions from it. + Async.wait as1) + (\_ -> k pushWA) + -- | Check if this is the initial worker state. isInitialState :: Eq b => WalletWorkerState b -> Bool @@ -187,9 +227,59 @@ instance Show b => Buildable (WalletAction b) where ApplyBlocks bs -> bprint ("ApplyBlocks " % shown) bs RollbackBlocks bs -> bprint ("RollbackBlocks " % shown) bs LogMessage bs -> bprint ("LogMessage " % shown) bs - Shutdown -> bprint "Shutdown" instance Show b => Buildable [WalletAction b] where build was = case was of [] -> bprint "[]" (x:xs) -> bprint (build % ":" % build) x xs + +-------------------------------------------------------------------------------- +-- STM closeable queue. + +-- | A FIFO queue that can be closed, preventing new input from being writen to +-- it. +-- +-- This is similar to 'Control.Concurrent.STM.TMQueue', redefined here with some +-- of its API to avoid a dependency on the 'stm-chans' library. +data TMQueue a + = UnsafeTMQueue !(STM.TVar TMQueueOpen) !(STM.TQueue a) + -- ^ Don't use this constructor directly. It's internal. It carries the queue + -- itself, and whether this 'TMQueue' is open or not. + +data TMQueueOpen = TMQueueOpen | TMQueueNotOpen + +-- | Creates a new empty and open 'TMQueue'. +newTMQueueIO :: IO (TMQueue a) +newTMQueueIO = UnsafeTMQueue <$> STM.newTVarIO TMQueueOpen <*> STM.newTQueueIO + +-- | Closes the 'TMQueue'. After this, any elements already in the 'TMQueue' +-- will continue to be successfully returned by 'readTMQueue'. However, any +-- new writes with 'writeTMQueue' will fail as described by its documentation. +closeTMQueue :: TMQueue a -> STM () +closeTMQueue (UnsafeTMQueue to _) = STM.writeTVar to TMQueueNotOpen + +-- | Writes a new input to the 'TMQueue', in FIFO order. +-- +-- It returns 'True' if the 'TMQueue' was open and it was possible to write to +-- it. Otherwise, if the 'TMQueue' was closed, it returns 'False', meaning +-- nothing has been writen to the queue. +writeTMQueue :: TMQueue a -> a -> STM Bool +writeTMQueue (UnsafeTMQueue to tq) a = do + STM.readTVar to >>= \case + TMQueueOpen -> STM.writeTQueue tq a >> pure True + TMQueueNotOpen -> pure False + +-- | Read a value from the 'TMQueue', in FIFO order. +-- +-- If the 'TMQueue' is empty and closed, then this function returns 'Nothing'. +-- Otherwise, if the 'TMQueue' is not closed, this function will block waiting +-- for new input. +readTMQueue :: TMQueue a -> STM (Maybe a) +readTMQueue (UnsafeTMQueue to tq) = do + STM.tryReadTQueue tq >>= \case + Just a -> pure (Just a) + Nothing -> STM.readTVar to >>= \case + TMQueueOpen -> Just <$> STM.readTQueue tq + TMQueueNotOpen -> pure Nothing + + diff --git a/wallet-new/src/Cardano/Wallet/WalletLayer/Kernel.hs b/wallet-new/src/Cardano/Wallet/WalletLayer/Kernel.hs index 2b98ad5fa89..0e4fa424b12 100644 --- a/wallet-new/src/Cardano/Wallet/WalletLayer/Kernel.hs +++ b/wallet-new/src/Cardano/Wallet/WalletLayer/Kernel.hs @@ -8,7 +8,7 @@ module Cardano.Wallet.WalletLayer.Kernel import Universum -import Data.Default (def) +import qualified Control.Concurrent.STM as STM import Data.Maybe (fromJust) import Data.Time.Units (Second) import System.Wlog (Severity (Debug)) @@ -17,7 +17,6 @@ import Pos.Chain.Block (Blund, Undo (..)) import qualified Cardano.Wallet.Kernel as Kernel import qualified Cardano.Wallet.Kernel.Transactions as Kernel -import qualified Cardano.Wallet.Kernel.Wallets as Kernel import qualified Cardano.Wallet.WalletLayer.Kernel.Accounts as Accounts import qualified Cardano.Wallet.WalletLayer.Kernel.Addresses as Addresses import qualified Cardano.Wallet.WalletLayer.Kernel.Wallets as Wallets @@ -39,14 +38,12 @@ import Cardano.Wallet.Kernel.CoinSelection.FromGeneric (CoinSelectionOptions (..), ExpenseRegulation, InputGrouping, newOptions) -import qualified Cardano.Wallet.Kernel.BIP39 as BIP39 import Pos.Core (Address, Coin) import qualified Pos.Core as Core import Pos.Core.Chrono (OldestFirst (..)) import qualified Cardano.Wallet.Kernel.Actions as Actions import Cardano.Wallet.Kernel.MonadDBReadAdaptor (MonadDBReadAdaptor) -import Pos.Crypto.Signing import Cardano.Wallet.API.V1.Types (Payment (..), PaymentDistribution (..), PaymentSource (..), @@ -62,40 +59,26 @@ bracketPassiveWallet -> (PassiveWalletLayer n -> Kernel.PassiveWallet -> m a) -> m a bracketPassiveWallet logFunction keystore rocksDB f = Kernel.bracketPassiveWallet logFunction keystore rocksDB $ \w -> do - - -- Create the wallet worker and its communication endpoint `invoke`. - bracket (liftIO $ Actions.forkWalletWorker $ Actions.WalletActionInterp - { Actions.applyBlocks = \blunds -> - Kernel.applyBlocks w $ - OldestFirst (mapMaybe blundToResolvedBlock (toList (getOldestFirst blunds))) - , Actions.switchToFork = \_ _ -> logFunction Debug "" - , Actions.emit = logFunction Debug - } - ) (\invoke -> liftIO (invoke Actions.Shutdown)) - $ \invoke -> do - -- TODO (temporary): build a sample wallet from a backup phrase - _ <- liftIO $ do - Kernel.createHdWallet w - (def @(BIP39.Mnemonic 12)) - emptyPassphrase - assuranceLevel - walletName - - f (passiveWalletLayer w invoke) w - + let wai = Actions.WalletActionInterp + { Actions.applyBlocks = \blunds -> + Kernel.applyBlocks w + (OldestFirst (mapMaybe blundToResolvedBlock + (toList (getOldestFirst blunds)))) + , Actions.switchToFork = \_ _ -> + logFunction Debug "" + , Actions.emit = logFunction Debug } + Actions.withWalletWorker wai $ \invoke -> do + f (passiveWalletLayer w invoke) w where - -- TODO consider defaults - walletName = HD.WalletName "(new wallet)" - assuranceLevel = HD.AssuranceLevelNormal - -- | TODO(ks): Currently not implemented! passiveWalletLayer :: Kernel.PassiveWallet - -> (Actions.WalletAction Blund -> IO ()) + -> (Actions.WalletAction Blund -> STM ()) -> PassiveWalletLayer n passiveWalletLayer wallet invoke = - PassiveWalletLayer + let invokeIO :: forall m'. MonadIO m' => Actions.WalletAction Blund -> m' () + invokeIO = liftIO . STM.atomically . invoke + in PassiveWalletLayer { _pwlCreateWallet = Wallets.createWallet wallet - , _pwlGetWalletIds = error "Not implemented!" , _pwlGetWallet = error "Not implemented!" , _pwlUpdateWallet = error "Not implemented!" @@ -116,8 +99,8 @@ bracketPassiveWallet logFunction keystore rocksDB f = , _pwlCreateAddress = Addresses.createAddress wallet , _pwlGetAddresses = error "Not implemented!" - , _pwlApplyBlocks = liftIO . invoke . Actions.ApplyBlocks - , _pwlRollbackBlocks = liftIO . invoke . Actions.RollbackBlocks + , _pwlApplyBlocks = invokeIO . Actions.ApplyBlocks + , _pwlRollbackBlocks = invokeIO . Actions.RollbackBlocks } -- The use of the unsafe constructor 'UnsafeRawResolvedBlock' is justified diff --git a/wallet-new/test/unit/Test/Spec/WalletWorker.hs b/wallet-new/test/unit/Test/Spec/WalletWorker.hs index e5fae6f3690..b47d45a842d 100644 --- a/wallet-new/test/unit/Test/Spec/WalletWorker.hs +++ b/wallet-new/test/unit/Test/Spec/WalletWorker.hs @@ -122,7 +122,6 @@ actionToStackOp = \case Actions.ApplyBlocks bs -> mapM_ push bs Actions.RollbackBlocks bs -> mapM_ (const pop) bs Actions.LogMessage _ -> return () - Actions.Shutdown -> return () where push = interpStackOp . Push pop = interpStackOp Pop