Skip to content

Commit 4de2de8

Browse files
iohk-bors[bot]Denis Shevchenko
and
Denis Shevchenko
authored
Merge #3496
3496: trace-forward: new API for acceptor's part. r=denisshevchenko a=denisshevchenko This PR changes API of the acceptor's part only, it doesn't touch the forwarder's part (used by `trace-dispatcher`): 1. `IO`-action returning `DataPointAsker` is provided. It allows avoiding `unsafePerformIO` in the acceptor application. 2. Peer error handlers were added. It allows the acceptor application to know if the connection with the node was dropped. Additionally: 1. `ekg-forward` dependency is updated. 2. `DataPoint` protocol is fixed: now there is a check if the reply with `DataPoint`s is empty. **Please note that `cardano-tracer` service, the next part of the new tracing infrastructure, depends on this PR. After it will be merged, the PR for `cardano-tracer` will be opened.** Co-authored-by: Denis Shevchenko <[email protected]>
2 parents 20db1f6 + 945a188 commit 4de2de8

File tree

4 files changed

+51
-39
lines changed

4 files changed

+51
-39
lines changed

cabal.project

+2-2
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ source-repository-package
279279
source-repository-package
280280
type: git
281281
location: https://github.com/input-output-hk/ekg-forward
282-
tag: 2adc8b698443bb10154304b24f6c1d6913bb65b9
283-
--sha256: 0cyixq3jmq43zs1yzrycqw1klyjy0zxf1vifknnr1k9d6sc3zf6b
282+
tag: 297cd9db5074339a2fb2e5ae7d0780debb670c63
283+
--sha256: 1zcwry3y5rmd9lgxy89wsb3k4kpffqji35dc7ghzbz603y1gy24g
284284

285285
source-repository-package
286286
type: git

trace-forward/src/Trace/Forward/Run/DataPoint/Acceptor.hs

+30-22
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ module Trace.Forward.Run.DataPoint.Acceptor
88
) where
99

1010
import qualified Codec.Serialise as CBOR
11+
import Control.Exception (finally)
12+
import Control.Monad (unless)
1113
import Control.Monad.Extra (ifM)
1214
import Control.Monad.STM (atomically, check)
1315
import Control.Concurrent.STM.TVar (modifyTVar', readTVar, readTVarIO)
@@ -21,46 +23,52 @@ import qualified Trace.Forward.Protocol.DataPoint.Acceptor as Acceptor
2123
import qualified Trace.Forward.Protocol.DataPoint.Codec as Acceptor
2224
import Trace.Forward.Protocol.DataPoint.Type (DataPointName)
2325
import Trace.Forward.Configuration.DataPoint (AcceptorConfiguration (..))
24-
import Trace.Forward.Utils.DataPoint (DataPointAsker (..))
26+
import Trace.Forward.Utils.DataPoint (DataPointRequestor (..))
2527

2628
acceptDataPointsInit
2729
:: AcceptorConfiguration
28-
-> DataPointAsker
30+
-> IO DataPointRequestor
31+
-> IO ()
2932
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
30-
acceptDataPointsInit config dpAsker =
31-
InitiatorProtocolOnly $ runPeerWithAsker config dpAsker
33+
acceptDataPointsInit config mkDPRequestor peerErrorHandler =
34+
InitiatorProtocolOnly $ runPeerWithRequestor config mkDPRequestor peerErrorHandler
3235

3336
acceptDataPointsResp
3437
:: AcceptorConfiguration
35-
-> DataPointAsker
38+
-> IO DataPointRequestor
39+
-> IO ()
3640
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
37-
acceptDataPointsResp config dpAsker =
38-
ResponderProtocolOnly $ runPeerWithAsker config dpAsker
41+
acceptDataPointsResp config mkDPRequestor peerErrorHandler =
42+
ResponderProtocolOnly $ runPeerWithRequestor config mkDPRequestor peerErrorHandler
3943

40-
runPeerWithAsker
44+
runPeerWithRequestor
4145
:: AcceptorConfiguration
42-
-> DataPointAsker
46+
-> IO DataPointRequestor
47+
-> IO ()
4348
-> MuxPeer LBS.ByteString IO ()
44-
runPeerWithAsker config dpAsker =
45-
MuxPeerRaw $ \channel ->
46-
runPeer
47-
(acceptorTracer config)
48-
(Acceptor.codecDataPointForward CBOR.encode CBOR.decode
49-
CBOR.encode CBOR.decode)
50-
channel
51-
(Acceptor.dataPointAcceptorPeer $ acceptorActions config dpAsker [])
49+
runPeerWithRequestor config mkDPRequestor peerErrorHandler =
50+
MuxPeerRaw $ \channel -> do
51+
dpRequestor <- mkDPRequestor
52+
runPeer
53+
(acceptorTracer config)
54+
(Acceptor.codecDataPointForward CBOR.encode CBOR.decode
55+
CBOR.encode CBOR.decode)
56+
channel
57+
(Acceptor.dataPointAcceptorPeer $ acceptorActions config dpRequestor [])
58+
`finally` peerErrorHandler
5259

5360
acceptorActions
5461
:: AcceptorConfiguration
55-
-> DataPointAsker
62+
-> DataPointRequestor
5663
-> [DataPointName]
5764
-> Acceptor.DataPointAcceptor IO ()
5865
acceptorActions config@AcceptorConfiguration{shouldWeStop}
59-
dpAsker@DataPointAsker{askDataPoints, dataPointsNames, dataPointsReply}
66+
dpRequestor@DataPointRequestor{askDataPoints, dataPointsNames, dataPointsReply}
6067
dpNames =
6168
Acceptor.SendMsgDataPointsRequest dpNames $ \replyWithDataPoints -> do
62-
-- Ok, reply with 'DataPoint's is already here, update the asker.
63-
atomically $ do
69+
-- Ok, reply with 'DataPoint's is already here, update the requestor.
70+
unless (null replyWithDataPoints) $ atomically $ do
71+
-- Store the reply for acceptor's external context.
6472
putTMVar dataPointsReply replyWithDataPoints
6573
-- To prevent new automatic request.
6674
modifyTVar' askDataPoints $ const False
@@ -72,4 +80,4 @@ acceptorActions config@AcceptorConfiguration{shouldWeStop}
7280
-- Ok, external context asked for 'DataPoint's, take their names.
7381
dpNames' <- readTVarIO dataPointsNames
7482
-- Ask.
75-
return $ acceptorActions config dpAsker dpNames'
83+
return $ acceptorActions config dpRequestor dpNames'

trace-forward/src/Trace/Forward/Run/TraceObject/Acceptor.hs

+12-8
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import Control.Concurrent.Async (race)
1313
import Control.Monad.Extra (ifM)
1414
import Control.Monad.STM (atomically, check)
1515
import Control.Concurrent.STM.TVar (TVar, readTVar, readTVarIO, registerDelay)
16-
import Control.Exception (Exception, throwIO)
16+
import Control.Exception (Exception, finally, throwIO)
1717
import qualified Data.ByteString.Lazy as LBS
1818
import Data.Typeable (Typeable)
1919
import Data.Void (Void)
@@ -34,28 +34,31 @@ acceptTraceObjectsInit
3434
Typeable lo)
3535
=> AcceptorConfiguration lo -- ^ Acceptor's configuration.
3636
-> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's.
37+
-> IO () -- ^ The handler for exceptions from 'runPeer'.
3738
-> RunMiniProtocol 'InitiatorMode LBS.ByteString IO () Void
38-
acceptTraceObjectsInit config loHandler =
39-
InitiatorProtocolOnly $ runPeerWithHandler config loHandler
39+
acceptTraceObjectsInit config loHandler peerErrorHandler =
40+
InitiatorProtocolOnly $ runPeerWithHandler config loHandler peerErrorHandler
4041

4142
acceptTraceObjectsResp
4243
:: (CBOR.Serialise lo,
4344
ShowProxy lo,
4445
Typeable lo)
4546
=> AcceptorConfiguration lo -- ^ Acceptor's configuration.
4647
-> ([lo] -> IO ()) -- ^ The handler for accepted 'TraceObject's.
48+
-> IO () -- ^ The handler for exceptions from 'runPeer'.
4749
-> RunMiniProtocol 'ResponderMode LBS.ByteString IO Void ()
48-
acceptTraceObjectsResp config loHandler =
49-
ResponderProtocolOnly $ runPeerWithHandler config loHandler
50+
acceptTraceObjectsResp config loHandler peerErrorHandler =
51+
ResponderProtocolOnly $ runPeerWithHandler config loHandler peerErrorHandler
5052

5153
runPeerWithHandler
5254
:: (CBOR.Serialise lo,
5355
ShowProxy lo,
5456
Typeable lo)
5557
=> AcceptorConfiguration lo
5658
-> ([lo] -> IO ())
59+
-> IO ()
5760
-> MuxPeer LBS.ByteString IO ()
58-
runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} loHandler =
61+
runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} loHandler peerErrorHandler =
5962
MuxPeerRaw $ \channel ->
6063
timeoutWhenStopped
6164
shouldWeStop
@@ -66,6 +69,7 @@ runPeerWithHandler config@AcceptorConfiguration{acceptorTracer, shouldWeStop} lo
6669
CBOR.encode CBOR.decode)
6770
channel
6871
(Acceptor.traceObjectAcceptorPeer $ acceptorActions config loHandler)
72+
`finally` peerErrorHandler
6973

7074
acceptorActions
7175
:: (CBOR.Serialise lo,
@@ -96,7 +100,7 @@ timeoutWhenStopped stopVar delay action =
96100
either id id <$>
97101
race action
98102
( do atomically (readTVar stopVar >>= check)
99-
v <- registerDelay delay
100-
atomically (readTVar v >>= check)
103+
v <- registerDelay delay
104+
atomically (readTVar v >>= check)
101105
throwIO Timeout
102106
)

trace-forward/src/Trace/Forward/Utils/DataPoint.hs

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
module Trace.Forward.Utils.DataPoint
66
( DataPoint (..)
77
, DataPointStore
8-
, DataPointAsker (..)
8+
, DataPointRequestor (..)
99
, initDataPointStore
10-
, initDataPointAsker
10+
, initDataPointRequestor
1111
, writeToStore
1212
, readFromStore
1313
, askForDataPoints
@@ -72,7 +72,7 @@ readFromStore dpStore =
7272
-- | Since 'DataPointForward' protocol does not assume the stream of requests/replies,
7373
-- we use the 'TVar's to provide to acceptor's side an ability to ask 'DataPoint's
7474
-- explicitly.
75-
data DataPointAsker = DataPointAsker
75+
data DataPointRequestor = DataPointRequestor
7676
{ -- | The "ask flag": we use it to notify that we want 'DataPoint's.
7777
askDataPoints :: !(TVar Bool)
7878
-- | The names of 'DataPoint's we need.
@@ -83,18 +83,18 @@ data DataPointAsker = DataPointAsker
8383
, dataPointsReply :: !(TMVar DataPointValues)
8484
}
8585

86-
initDataPointAsker :: IO DataPointAsker
87-
initDataPointAsker = DataPointAsker
86+
initDataPointRequestor :: IO DataPointRequestor
87+
initDataPointRequestor = DataPointRequestor
8888
<$> newTVarIO False
8989
<*> newTVarIO []
9090
<*> newEmptyTMVarIO
9191

9292
askForDataPoints
93-
:: DataPointAsker
93+
:: DataPointRequestor
9494
-> [DataPointName]
9595
-> IO DataPointValues
9696
askForDataPoints _ [] = return []
97-
askForDataPoints DataPointAsker{askDataPoints, dataPointsNames, dataPointsReply} dpNames = do
97+
askForDataPoints DataPointRequestor{askDataPoints, dataPointsNames, dataPointsReply} dpNames = do
9898
atomically $ do
9999
modifyTVar' dataPointsNames $ const dpNames -- Fill the names of 'DataPoint's we need.
100100
modifyTVar' askDataPoints $ const True -- Ask them! The flag for acceptor's part

0 commit comments

Comments
 (0)