Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit ab84006

Browse files
committed
[CBR-243] improve wallet worker start-up and exception handling
1 parent 9644c97 commit ab84006

File tree

4 files changed

+135
-57
lines changed

4 files changed

+135
-57
lines changed

pkgs/default.nix

+1
Original file line numberDiff line numberDiff line change
@@ -17887,6 +17887,7 @@ servant-swagger-ui-core
1788717887
servant-swagger-ui-redoc
1788817888
sqlite-simple
1788917889
sqlite-simple-errors
17890+
stm
1789017891
swagger2
1789117892
text
1789217893
time

wallet-new/cardano-sl-wallet-new.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ library
196196
, servant-swagger-ui-redoc
197197
, sqlite-simple
198198
, sqlite-simple-errors
199+
, stm
199200
, swagger2
200201
, text
201202
, time

wallet-new/src/Cardano/Wallet/Kernel/Actions.hs

+116-26
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
module Cardano.Wallet.Kernel.Actions
33
( WalletAction(..)
44
, WalletActionInterp(..)
5-
, forkWalletWorker
6-
, walletWorker
5+
, withWalletWorker
6+
, WalletWorkerExpiredError(..)
77
, interp
88
, interpList
99
, WalletWorkerState
@@ -12,8 +12,9 @@ module Cardano.Wallet.Kernel.Actions
1212
, isValidState
1313
) where
1414

15-
import Control.Concurrent.Async (async, link)
16-
import Control.Concurrent.Chan
15+
import qualified Control.Concurrent.Async as Async
16+
import qualified Control.Concurrent.STM as STM
17+
import qualified Control.Exception.Safe as Ex
1718
import Control.Lens (makeLenses, (%=), (+=), (-=), (.=))
1819
import Formatting (bprint, build, shown, (%))
1920
import qualified Formatting.Buildable
@@ -33,7 +34,6 @@ data WalletAction b
3334
= ApplyBlocks (OldestFirst NE b)
3435
| RollbackBlocks (NewestFirst NE b)
3536
| LogMessage Text
36-
| Shutdown
3737

3838
-- | Interface abstraction for the wallet worker.
3939
-- The caller provides these primitive wallet operations;
@@ -118,23 +118,25 @@ interp walletInterp action = do
118118

119119
LogMessage txt -> emit txt
120120

121-
Shutdown -> error "walletWorker: unreacheable dead code, reached!"
122-
123121
where
124122
WalletActionInterp{..} = lifted walletInterp
125123
prependNewestFirst bs = \nf -> NewestFirst (getNewestFirst bs <> getNewestFirst nf)
126124

127-
-- | Connect a wallet action interpreter to a channel of actions.
128-
walletWorker :: forall b. Chan (WalletAction b) -> WalletActionInterp IO b -> IO ()
129-
walletWorker chan ops = do
130-
emit ops "Starting wallet worker."
131-
void $ (`evalStateT` initialWorkerState) tick
132-
emit ops "Finishing wallet worker."
133-
where
134-
tick :: StateT (WalletWorkerState b) IO ()
135-
tick = lift (readChan chan) >>= \case
136-
Shutdown -> return ()
137-
msg -> interp ops msg >> tick
125+
-- | Connect a wallet action interpreter to a source of actions. This function
126+
-- returns as soon as the given action returns 'Nothing'.
127+
walletWorker
128+
:: Ex.MonadMask m
129+
=> WalletActionInterp m b
130+
-> m (Maybe (WalletAction b))
131+
-> m ()
132+
walletWorker wai getWA = Ex.bracket_
133+
(emit wai "Starting wallet worker.")
134+
(evalStateT
135+
(fix $ \next -> lift getWA >>= \case
136+
Nothing -> pure ()
137+
Just wa -> interp wai wa >> next)
138+
initialWorkerState)
139+
(emit wai "Stopping wallet worker.")
138140

139141
-- | Connect a wallet action interpreter to a stream of actions.
140142
interpList :: Monad m => WalletActionInterp m b -> [WalletAction b] -> m (WalletWorkerState b)
@@ -147,13 +149,51 @@ initialWorkerState = WalletWorkerState
147149
, _lengthPendingBlocks = 0
148150
}
149151

150-
-- | Start up a wallet worker; the worker will respond to actions issued over the
151-
-- returned channel.
152-
forkWalletWorker :: WalletActionInterp IO b -> IO (WalletAction b -> IO ())
153-
forkWalletWorker ops = do
154-
c <- newChan
155-
link =<< async (walletWorker c ops)
156-
return (writeChan c)
152+
-- | Thrown by 'withWalletWorker''s continuation in case it's used outside of
153+
-- its intended scope.
154+
data WalletWorkerExpiredError = WalletWorkerExpiredError deriving (Show)
155+
instance Ex.Exception WalletWorkerExpiredError
156+
157+
-- | Start a wallet worker in backround who will react to input provided via the
158+
-- 'STM' function, in FIFO order.
159+
--
160+
-- After the given continuation returns (successfully or due to some exception),
161+
-- the worker will continue processing any pending input before returning,
162+
-- re-throwing the continuation's exception if any. Async exceptions from any
163+
-- source will always be prioritized.
164+
--
165+
-- Usage of the obtained 'STM' action after the given continuation has returned
166+
-- is not possible. It will throw 'WalletWorkerExpiredError'.
167+
withWalletWorker
168+
:: (MonadIO m, Ex.MonadMask m)
169+
=> WalletActionInterp IO a
170+
-> ((WalletAction a -> STM ()) -> m b)
171+
-> m b
172+
withWalletWorker wai k = do
173+
-- 'tmq' keeps items to be processed by the worker in FIFO order.
174+
tmq :: TMQueue (WalletAction a) <- liftIO newTMQueueIO
175+
-- 'getWA' gets the next action to be processed.
176+
let getWA :: STM (Maybe (WalletAction a))
177+
getWA = readTMQueue tmq
178+
-- 'pushWA' adds an action to queue, unless it's been closed already.
179+
let pushWA :: WalletAction a -> STM ()
180+
pushWA = \wa -> writeTMQueue tmq wa >>= \case
181+
True -> pure ()
182+
False -> Ex.throwM WalletWorkerExpiredError
183+
-- Run the worker in the background, ensuring that any exceptions from it
184+
-- get thrown to the current thread.
185+
Ex.bracket
186+
(liftIO $ do
187+
as1 <- Async.async (walletWorker wai (STM.atomically getWA))
188+
Async.link as1
189+
pure as1)
190+
(\as1 -> liftIO $ do
191+
-- Prevent new input.
192+
STM.atomically (closeTMQueue tmq)
193+
-- Wait for the worker to finish, re-throwing any exceptions from it.
194+
Async.wait as1)
195+
(\_ -> k pushWA)
196+
157197

158198
-- | Check if this is the initial worker state.
159199
isInitialState :: Eq b => WalletWorkerState b -> Bool
@@ -187,9 +227,59 @@ instance Show b => Buildable (WalletAction b) where
187227
ApplyBlocks bs -> bprint ("ApplyBlocks " % shown) bs
188228
RollbackBlocks bs -> bprint ("RollbackBlocks " % shown) bs
189229
LogMessage bs -> bprint ("LogMessage " % shown) bs
190-
Shutdown -> bprint "Shutdown"
191230

192231
instance Show b => Buildable [WalletAction b] where
193232
build was = case was of
194233
[] -> bprint "[]"
195234
(x:xs) -> bprint (build % ":" % build) x xs
235+
236+
--------------------------------------------------------------------------------
237+
-- STM closeable queue.
238+
239+
-- | A FIFO queue that can be closed, preventing new input from being writen to
240+
-- it.
241+
--
242+
-- This is similar to 'Control.Concurrent.STM.TMQueue', redefined here with some
243+
-- of its API to avoid a dependency on the 'stm-chans' library.
244+
data TMQueue a
245+
= UnsafeTMQueue !(STM.TVar TMQueueOpen) !(STM.TQueue a)
246+
-- ^ Don't use this constructor directly. It's internal. It carries the queue
247+
-- itself, and whether this 'TMQueue' is open or not.
248+
249+
data TMQueueOpen = TMQueueOpen | TMQueueNotOpen
250+
251+
-- | Creates a new empty and open 'TMQueue'.
252+
newTMQueueIO :: IO (TMQueue a)
253+
newTMQueueIO = UnsafeTMQueue <$> STM.newTVarIO TMQueueOpen <*> STM.newTQueueIO
254+
255+
-- | Closes the 'TMQueue'. After this, any elements already in the 'TMQueue'
256+
-- will continue to be successfully returned by 'readTMQueue'. However, any
257+
-- new writes with 'writeTMQueue' will fail as described by its documentation.
258+
closeTMQueue :: TMQueue a -> STM ()
259+
closeTMQueue (UnsafeTMQueue to _) = STM.writeTVar to TMQueueNotOpen
260+
261+
-- | Writes a new input to the 'TMQueue', in FIFO order.
262+
--
263+
-- It returns 'True' if the 'TMQueue' was open and it was possible to write to
264+
-- it. Otherwise, if the 'TMQueue' was closed, it returns 'False', meaning
265+
-- nothing has been writen to the queue.
266+
writeTMQueue :: TMQueue a -> a -> STM Bool
267+
writeTMQueue (UnsafeTMQueue to tq) a = do
268+
STM.readTVar to >>= \case
269+
TMQueueOpen -> STM.writeTQueue tq a >> pure True
270+
TMQueueNotOpen -> pure False
271+
272+
-- | Read a value from the 'TMQueue', in FIFO order.
273+
--
274+
-- If the 'TMQueue' is empty and closed, then this function returns 'Nothing'.
275+
-- Otherwise, if the 'TMQueue' is not closed, this function will block waiting
276+
-- for new input.
277+
readTMQueue :: TMQueue a -> STM (Maybe a)
278+
readTMQueue (UnsafeTMQueue to tq) = do
279+
STM.tryReadTQueue tq >>= \case
280+
Just a -> pure (Just a)
281+
Nothing -> STM.readTVar to >>= \case
282+
TMQueueOpen -> Just <$> STM.readTQueue tq
283+
TMQueueNotOpen -> pure Nothing
284+
285+

wallet-new/src/Cardano/Wallet/WalletLayer/Kernel.hs

+17-31
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ module Cardano.Wallet.WalletLayer.Kernel
88

99
import Universum
1010

11+
import qualified Control.Concurrent.STM as STM
1112
import Control.Lens (to)
1213
import Data.Coerce (coerce)
13-
import Data.Default (def)
1414
import Data.Maybe (fromJust)
1515
import Data.Time.Units (Second)
1616
import Formatting (build, sformat)
@@ -41,7 +41,6 @@ import Cardano.Wallet.Kernel.CoinSelection.FromGeneric
4141
(CoinSelectionOptions (..), ExpenseRegulation,
4242
InputGrouping, newOptions)
4343

44-
import qualified Cardano.Wallet.Kernel.BIP39 as BIP39
4544
import Pos.Core (Address, Coin, decodeTextAddress, mkCoin)
4645
import qualified Pos.Core as Core
4746
import Pos.Core.Chrono (OldestFirst (..))
@@ -66,38 +65,25 @@ bracketPassiveWallet
6665
-> (PassiveWalletLayer n -> Kernel.PassiveWallet -> m a) -> m a
6766
bracketPassiveWallet logFunction keystore rocksDB f =
6867
Kernel.bracketPassiveWallet logFunction keystore rocksDB $ \w -> do
69-
70-
-- Create the wallet worker and its communication endpoint `invoke`.
71-
bracket (liftIO $ Actions.forkWalletWorker $ Actions.WalletActionInterp
72-
{ Actions.applyBlocks = \blunds ->
73-
Kernel.applyBlocks w $
74-
OldestFirst (mapMaybe blundToResolvedBlock (toList (getOldestFirst blunds)))
75-
, Actions.switchToFork = \_ _ -> logFunction Debug "<switchToFork>"
76-
, Actions.emit = logFunction Debug
77-
}
78-
) (\invoke -> liftIO (invoke Actions.Shutdown))
79-
$ \invoke -> do
80-
-- TODO (temporary): build a sample wallet from a backup phrase
81-
_ <- liftIO $ do
82-
Kernel.createHdWallet w
83-
(def @(BIP39.Mnemonic 12))
84-
emptyPassphrase
85-
assuranceLevel
86-
walletName
87-
88-
f (passiveWalletLayer w invoke) w
89-
68+
let wai = Actions.WalletActionInterp
69+
{ Actions.applyBlocks = \blunds ->
70+
Kernel.applyBlocks w
71+
(OldestFirst (mapMaybe blundToResolvedBlock
72+
(toList (getOldestFirst blunds))))
73+
, Actions.switchToFork = \_ _ ->
74+
logFunction Debug "<switchToFork>"
75+
, Actions.emit = logFunction Debug }
76+
Actions.withWalletWorker wai $ \invoke -> do
77+
f (passiveWalletLayer w invoke) w
9078
where
91-
-- TODO consider defaults
92-
walletName = HD.WalletName "(new wallet)"
93-
assuranceLevel = HD.AssuranceLevelNormal
94-
9579
-- | TODO(ks): Currently not implemented!
9680
passiveWalletLayer :: Kernel.PassiveWallet
97-
-> (Actions.WalletAction Blund -> IO ())
81+
-> (Actions.WalletAction Blund -> STM ())
9882
-> PassiveWalletLayer n
9983
passiveWalletLayer wallet invoke =
100-
PassiveWalletLayer
84+
let invokeIO :: forall m'. MonadIO m' => Actions.WalletAction Blund -> m' ()
85+
invokeIO = liftIO . STM.atomically . invoke
86+
in PassiveWalletLayer
10187
{ _pwlCreateWallet =
10288
\(V1.NewWallet (V1.BackupPhrase mnemonic) mbSpendingPassword v1AssuranceLevel v1WalletName operation) -> do
10389
liftIO $ limitExecutionTimeTo (30 :: Second) CreateWalletTimeLimitReached $ do
@@ -166,8 +152,8 @@ bracketPassiveWallet logFunction keystore rocksDB f =
166152
Left err -> return (Left $ CreateAddressError err)
167153
, _pwlGetAddresses = error "Not implemented!"
168154

169-
, _pwlApplyBlocks = liftIO . invoke . Actions.ApplyBlocks
170-
, _pwlRollbackBlocks = liftIO . invoke . Actions.RollbackBlocks
155+
, _pwlApplyBlocks = invokeIO . Actions.ApplyBlocks
156+
, _pwlRollbackBlocks = invokeIO . Actions.RollbackBlocks
171157
}
172158

173159
-- The use of the unsafe constructor 'UnsafeRawResolvedBlock' is justified

0 commit comments

Comments
 (0)