Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit e2408a7

Browse files
Merge #3998
3998: [CDEC-658] port to release/2.0.1 r=avieth a=avieth Cherry-picked merge commit. 3994: [CDEC-658] simplify threading in block streaming r=avieth a=avieth If there is a connection problem causing the block streaming conversation to fail even to come up, then the retrieval worker will hang, because the block streaming `TBQueue` will never show `StreamEnd`: this is done in the `finally` clauses of the conversation callbacks _themselves_ rather than the conversation at large (they never started in this case). The solution in this pull request is to do the queue sourcing (`retrieveBlocks`) as well as the queue sinking (`processBlocks`) from within the conversation callback itself, by way of `concurrently`. If the conversation fails to even come up, then the exception will come through, and the retrieval worker will carry on. A bit more detail on how this arose: it required a network failure while _streaming_ blocks, rather than just fetching the next one, so it was a bit more rare than might be expected. I'd like to make a test case for this, but it's difficult, because it requires a network failure at a particular moment. The node must be able to get the header announcements, but the network needs to go down as soon as it attempts to make a connection from streaming blocks. The first two commits are not strictly related, but I had them around because I need them in order to build cardano-sl as a dependency. I can drop them if somebody really feels strongly that I should, but I'll need to merge them anyway. https://iohk.myjetbrains.com/youtrack/issue/CDEC-658 Co-authored-by: Alexander Vieth <[email protected]> ## Description <!--- A brief description of this PR and the problem is trying to solve --> ## Linked issue <!--- Put here the relevant issue from YouTrack --> Co-authored-by: iohk-bors[bot] <iohk-bors[bot]@users.noreply.github.com> Co-authored-by: Alexander Vieth <[email protected]>
2 parents 7fb31c2 + b8e8d8e commit e2408a7

File tree

11 files changed

+155
-456
lines changed

11 files changed

+155
-456
lines changed

infra/src/Pos/Infra/Diffusion/Types.hs

+22-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ module Pos.Infra.Diffusion.Types
77
, Diffusion (..)
88
, hoistDiffusion
99
, dummyDiffusionLayer
10+
, StreamBlocks (..)
11+
, hoistStreamBlocks
1012
, DiffusionHealth (..)
1113
) where
1214

@@ -29,6 +31,24 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates,
2931
emptySubscriptionStates)
3032
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
3133

34+
-- | How to handle a stream of blocks.
35+
data StreamBlocks block m t = StreamBlocks
36+
{ streamBlocksMore :: NonEmpty block -> m (StreamBlocks block m t)
37+
-- ^ The server gives a batch of blocks.
38+
, streamBlocksDone :: m t
39+
-- ^ The server has no more blocks.
40+
}
41+
42+
hoistStreamBlocks
43+
:: ( Functor n )
44+
=> (forall x . m x -> n x)
45+
-> StreamBlocks block m t
46+
-> StreamBlocks block n t
47+
hoistStreamBlocks nat streamBlocks = streamBlocks
48+
{ streamBlocksMore = \blks ->
49+
fmap (hoistStreamBlocks nat) (nat (streamBlocksMore streamBlocks blks))
50+
, streamBlocksDone = nat (streamBlocksDone streamBlocks)
51+
}
3252

3353
data DiffusionHealth = DiffusionHealth {
3454
dhStreamWriteQueue :: !Gauge -- Number of blocks stored in the block stream write queue
@@ -49,7 +69,7 @@ data Diffusion m = Diffusion
4969
NodeId
5070
-> HeaderHash
5171
-> [HeaderHash]
52-
-> ([Block] -> m t)
72+
-> StreamBlocks Block m t
5373
-> m (Maybe t)
5474
-- | This is needed because there's a security worker which will request
5575
-- tip-of-chain from the network if it determines it's very far behind.
@@ -108,7 +128,7 @@ hoistDiffusion
108128
-> Diffusion n
109129
hoistDiffusion nat rnat orig = Diffusion
110130
{ getBlocks = \nid bh hs -> nat $ getBlocks orig nid bh hs
111-
, streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (rnat . k)
131+
, streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (hoistStreamBlocks rnat k)
112132
, requestTip = nat $ (fmap . fmap) nat (requestTip orig)
113133
, announceBlockHeader = nat . announceBlockHeader orig
114134
, sendTx = nat . sendTx orig

lib/bench/Bench/Pos/Diffusion/BlockDownload.hs

+6-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ import Pos.Diffusion.Full (FullDiffusionConfiguration (..),
4444
FullDiffusionInternals (..),
4545
RunFullDiffusionInternals (..),
4646
diffusionLayerFullExposeInternals)
47-
import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..))
47+
import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..),
48+
StreamBlocks (..))
4849
import qualified Pos.Infra.Network.Policy as Policy
4950
import Pos.Infra.Network.Types (Bucket (..))
5051
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
@@ -226,7 +227,10 @@ blockDownloadStream serverAddress setStreamIORef client ~(blockHeader, checkpoin
226227
where
227228
numBlocks = batches * 2200
228229

229-
writeCallback !_ = return ()
230+
writeCallback = StreamBlocks
231+
{ streamBlocksMore = \(!_) -> pure writeCallback
232+
, streamBlocksDone = pure ()
233+
}
230234

231235
blockDownloadBenchmarks :: NodeId -> (Int -> IO ()) -> Diffusion IO -> [Criterion.Benchmark]
232236
blockDownloadBenchmarks serverAddress setStreamIORef client =

lib/src/Pos/Diffusion/Full.hs

+3-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates,
6868
emptySubscriptionStates)
6969
import Pos.Infra.Diffusion.Transport.TCP (bracketTransportTCP)
7070
import Pos.Infra.Diffusion.Types (Diffusion (..),
71-
DiffusionHealth (..), DiffusionLayer (..))
71+
DiffusionHealth (..), DiffusionLayer (..),
72+
StreamBlocks (..))
7273
import Pos.Infra.Network.Types (Bucket (..), NetworkConfig (..),
7374
NodeType, SubscriptionWorker (..), initQueue,
7475
topologyHealthStatus, topologyRunKademlia,
@@ -358,7 +359,7 @@ diffusionLayerFullExposeInternals fdconf
358359
NodeId
359360
-> HeaderHash
360361
-> [HeaderHash]
361-
-> ([Block] -> IO t)
362+
-> StreamBlocks Block IO t
362363
-> IO (Maybe t)
363364
streamBlocks = Diffusion.Block.streamBlocks logTrace diffusionHealth logic streamWindow enqueue
364365

lib/src/Pos/Diffusion/Full/Block.hs

+96-100
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ module Pos.Diffusion.Full.Block
1414

1515
import Universum
1616

17-
import Control.Concurrent (threadDelay)
18-
import Control.Concurrent.Async (cancel)
17+
import qualified Control.Concurrent.Async as Async
1918
import qualified Control.Concurrent.STM as Conc
2019
import Control.Exception (Exception (..), throwIO)
2120
import Control.Lens (to)
@@ -55,7 +54,8 @@ import Pos.Infra.Communication.Protocol (Conversation (..),
5554
MkListeners (..), MsgType (..), NodeId, Origin (..),
5655
OutSpecs, constantListeners, recvLimited,
5756
waitForConversations, waitForDequeues)
58-
import Pos.Infra.Diffusion.Types (DiffusionHealth (..))
57+
import Pos.Infra.Diffusion.Types (DiffusionHealth (..),
58+
StreamBlocks (..))
5959
import Pos.Infra.Network.Types (Bucket)
6060
import Pos.Infra.Util.TimeWarp (nodeIdToAddress)
6161
import Pos.Logic.Types (Logic)
@@ -281,78 +281,28 @@ getBlocks logTrace logic recoveryHeadersMessage enqueue nodeId tipHeaderHash che
281281
data StreamEntry = StreamEnd | StreamBlock !Block
282282

283283
-- | Stream some blocks from the network.
284-
-- Returns Nothing if streaming is disabled by the client or not supported by the peer.
284+
-- If streaming is not supported by the client or peer, you get 'Nothing'. We
285+
-- don't fall back to batching because we can't: that method requires having
286+
-- all of the header hashes for the blocks you desire.
285287
streamBlocks
286288
:: forall t .
287289
Trace IO (Severity, Text)
288290
-> Maybe DiffusionHealth
289291
-> Logic IO
290-
-> Word32
292+
-> Word32 -- ^ Size of stream window. 0 implies 'Nothing' is returned.
291293
-> EnqueueMsg
292294
-> NodeId
293295
-> HeaderHash
294296
-> [HeaderHash]
295-
-> ([Block] -> IO t)
297+
-> StreamBlocks Block IO t
296298
-> IO (Maybe t)
297-
streamBlocks _ _ _ 0 _ _ _ _ _ = return Nothing -- Fallback to batch mode
298-
streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints k = do
299-
blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow
300-
let batchSize = min 64 streamWindow
301-
fallBack <- atomically $ Conc.newTVar False
302-
requestVar <- requestBlocks fallBack blockChan
303-
r <- processBlocks batchSize 0 [] blockChan `finally` (atomically $ do
304-
status <- Conc.readTVar requestVar
305-
case status of
306-
OQ.PacketAborted -> pure (pure ())
307-
OQ.PacketEnqueued -> do
308-
Conc.writeTVar requestVar OQ.PacketAborted
309-
pure (pure ())
310-
OQ.PacketDequeued asyncIO -> pure (cancel asyncIO))
311-
r' <- atomically $ Conc.readTVar fallBack
312-
if r' then pure Nothing
313-
else pure $ Just r
299+
streamBlocks _ _ _ 0 _ _ _ _ _ =
300+
return Nothing -- Fallback to batch mode
301+
streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints streamBlocksK =
302+
requestBlocks >>= Async.wait
314303
where
315304

316-
processBlocks :: Word32 -> Word32 -> [Block] -> Conc.TBQueue StreamEntry -> IO t
317-
processBlocks batchSize !n !blocks blockChan = do
318-
streamEntry <- atomically $ Conc.readTBQueue blockChan
319-
case streamEntry of
320-
StreamEnd -> k blocks
321-
StreamBlock block -> do
322-
let n' = n + 1
323-
when (n' `mod` 256 == 0) $
324-
traceWith logTrace (Debug,
325-
sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block)
326-
(block ^. difficultyL))
327-
case smM of
328-
Nothing -> pure ()
329-
Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm
330-
331-
if n' `mod` batchSize == 0
332-
then do
333-
_ <- k (block : blocks)
334-
processBlocks batchSize n' [] blockChan
335-
else
336-
processBlocks batchSize n' (block : blocks) blockChan
337-
338-
writeStreamEnd :: Conc.TBQueue StreamEntry -> IO ()
339-
writeStreamEnd blockChan = writeBlock 1024 blockChan StreamEnd
340-
341-
-- It is possible that the reader of the TBQueue stops unexpectedly which
342-
-- means that we we will have to use a timeout instead of blocking forever
343-
-- while attempting to write to a full queue.
344-
writeBlock :: Int -> Conc.TBQueue StreamEntry -> StreamEntry -> IO ()
345-
writeBlock delay _ _ | delay >= 4000000 = do
346-
let msg = "Error write timeout to local reader"
347-
traceWith logTrace (Warning, msg)
348-
throwM $ DialogUnexpected msg
349-
writeBlock delay blockChan b = do
350-
isFull <- atomically $ Conc.isFullTBQueue blockChan
351-
if isFull
352-
then do
353-
threadDelay delay
354-
writeBlock (delay * 2) blockChan b
355-
else atomically $ Conc.writeTBQueue blockChan b
305+
batchSize = min 64 streamWindow
356306

357307
mkStreamStart :: [HeaderHash] -> HeaderHash -> MsgStream
358308
mkStreamStart chain wantedBlock =
@@ -362,49 +312,62 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
362312
, mssWindow = streamWindow
363313
}
364314

365-
requestBlocks :: Conc.TVar Bool -> Conc.TBQueue StreamEntry -> IO (Conc.TVar (OQ.PacketStatus ()))
366-
requestBlocks fallBack blockChan = do
367-
convMap <- enqueue (MsgRequestBlocks (S.singleton nodeId))
368-
(\_ _ -> (Conversation $ \it -> requestBlocksConversation blockChan it `onException` writeStreamEnd blockChan) :|
369-
[(Conversation $ \it -> requestBatch fallBack blockChan it `finally` writeStreamEnd blockChan)]
370-
)
315+
-- Enqueue a conversation which will attempt to stream.
316+
-- This returns when the conversation is dequeued, or throws an exception
317+
-- in case it's aborted or is not enqueued.
318+
requestBlocks :: IO (Async.Async (Maybe t))
319+
requestBlocks = do
320+
convMap <- enqueue
321+
(MsgRequestBlocks (S.singleton nodeId))
322+
(\_ _ -> (Conversation $ streamBlocksConversation) :|
323+
[(Conversation $ batchConversation)]
324+
)
325+
-- Outbound queue guarantees that the map is either size 0 or 1, since
326+
-- 'S.singleton nodeId' was given to the enqueue.
371327
case M.lookup nodeId convMap of
372-
Just tvar -> pure tvar
328+
Just tvar -> atomically $ do
329+
pStatus <- Conc.readTVar tvar
330+
case pStatus of
331+
OQ.PacketEnqueued -> Conc.retry
332+
-- Somebody else arborted our call; nothing to do but
333+
-- throw.
334+
OQ.PacketAborted -> Conc.throwSTM $ DialogUnexpected $ "streamBlocks: aborted"
335+
OQ.PacketDequeued streamThread -> pure streamThread
373336
-- FIXME shouldn't have to deal with this.
374337
-- One possible solution: do the block request in response to an
375338
-- unsolicited header, so that's it's all done in one conversation,
376339
-- and so there's no need to even track the 'nodeId'.
377-
Nothing -> throwM $ DialogUnexpected $ "requestBlocks did not contact given peer"
378-
379-
requestBatch
380-
:: Conc.TVar Bool
381-
-> Conc.TBQueue StreamEntry
382-
-> ConversationActions MsgGetBlocks MsgBlock
383-
-> IO ()
384-
requestBatch fallBack _ _ = do
385-
-- The peer doesn't support streaming, we need to fall back to batching but
386-
-- the current conversation is unusable since there is no way for us to learn
387-
-- which blocks we shall fetch.
388-
-- We will always have room to write a singel StreamEnd so there is no need to
389-
-- differentiate between normal execution and when we get an expection.
390-
atomically $ writeTVar fallBack True
391-
return ()
392-
393-
requestBlocksConversation
394-
:: Conc.TBQueue StreamEntry
395-
-> ConversationActions MsgStream MsgStreamBlock
396-
-> IO ()
397-
requestBlocksConversation blockChan conv = do
340+
Nothing -> throwIO $ DialogUnexpected $ "streamBlocks: did not contact given peer"
341+
342+
-- The peer doesn't support streaming, we need to fall back to batching but
343+
-- the current conversation is unusable since there is no way for us to learn
344+
-- which blocks we shall fetch.
345+
batchConversation
346+
:: ConversationActions MsgGetBlocks MsgBlock
347+
-> IO (Maybe t)
348+
batchConversation _ = pure Nothing
349+
350+
streamBlocksConversation
351+
:: ConversationActions MsgStream MsgStreamBlock
352+
-> IO (Maybe t)
353+
streamBlocksConversation conv = do
398354
let newestHash = headerHash tipHeader
399355
traceWith logTrace (Debug,
400356
sformat ("streamBlocks: Requesting stream of blocks from "%listJson%" to "%shortHashF)
401357
checkpoints
402358
newestHash)
403359
send conv $ mkStreamStart checkpoints newestHash
404360
bvd <- Logic.getAdoptedBVData logic
405-
retrieveBlocks bvd blockChan conv streamWindow
406-
atomically $ Conc.writeTBQueue blockChan StreamEnd
407-
return ()
361+
-- Two threads are used here: one to pull in blocks, and one to
362+
-- call into the application 'StreamBlocks' value. The reason:
363+
-- the latter could do a lot of work for each batch, so having another
364+
-- thread continually pulling in with a buffer in the middle will
365+
-- smooth the traffic.
366+
blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow
367+
(_, b) <- Async.concurrently
368+
(retrieveBlocks bvd blockChan conv streamWindow)
369+
(processBlocks 0 [] blockChan streamBlocksK)
370+
pure $ Just b
408371

409372
halfStreamWindow = max 1 $ streamWindow `div` 2
410373

@@ -426,15 +389,14 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
426389
else return $ window - 1
427390
block <- retrieveBlock bvd conv
428391
case block of
429-
MsgStreamNoBlock t -> do
430-
let msg = sformat ("MsgStreamNoBlock "%stext) t
431-
traceWith logTrace (Warning, msg)
392+
MsgStreamNoBlock e -> do
393+
let msg = sformat ("MsgStreamNoBlock "%stext) e
394+
traceWith logTrace (Error, msg)
432395
throwM $ DialogUnexpected msg
433396
MsgStreamEnd -> do
397+
atomically $ Conc.writeTBQueue blockChan StreamEnd
434398
traceWith logTrace (Debug, sformat ("Streaming done client-side for node"%build) nodeId)
435-
return ()
436399
MsgStreamBlock b -> do
437-
-- traceWith logTrace (Debug, sformat ("Read block "%shortHashF) (headerHash b))
438400
atomically $ Conc.writeTBQueue blockChan (StreamBlock b)
439401
case smM of
440402
Nothing -> pure ()
@@ -452,10 +414,44 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
452414
case blockE of
453415
Nothing -> do
454416
let msg = sformat ("Error retrieving blocks from peer "%build) nodeId
455-
traceWith logTrace (Warning, msg)
417+
traceWith logTrace (Error, msg)
456418
throwM $ DialogUnexpected msg
457419
Just block -> return block
458420

421+
processBlocks
422+
:: Word32
423+
-> [Block]
424+
-> Conc.TBQueue StreamEntry
425+
-> StreamBlocks Block IO t
426+
-> IO t
427+
processBlocks n !blocks blockChan k = do
428+
streamEntry <- atomically $ Conc.readTBQueue blockChan
429+
case streamEntry of
430+
StreamEnd -> case blocks of
431+
[] -> streamBlocksDone k
432+
(blk:blks) -> do
433+
k' <- streamBlocksMore k (blk :| blks)
434+
streamBlocksDone k'
435+
StreamBlock block -> do
436+
-- FIXME this logging stuff should go into the particular
437+
-- 'StreamBlocks' value rather than here.
438+
let n' = n + 1
439+
when (n' `mod` 256 == 0) $
440+
traceWith logTrace (Debug,
441+
sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block)
442+
(block ^. difficultyL))
443+
case smM of
444+
Nothing -> pure ()
445+
Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm
446+
447+
if n' `mod` batchSize == 0
448+
then do
449+
k' <- streamBlocksMore k (block :| blocks)
450+
processBlocks n' [] blockChan k'
451+
else
452+
processBlocks n' (block : blocks) blockChan k
453+
454+
459455
requestTip
460456
:: Trace IO (Severity, Text)
461457
-> Logic IO

lib/src/Pos/Network/Block/Retrieval.hs

+9-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import Pos.DB.Block (ClassifyHeaderRes (..), classifyNewHeader,
3131
getHeadersOlderExp)
3232
import qualified Pos.DB.BlockIndex as DB
3333
import Pos.Infra.Communication.Protocol (NodeId)
34-
import Pos.Infra.Diffusion.Types (Diffusion)
34+
import Pos.Infra.Diffusion.Types (Diffusion, StreamBlocks (..))
3535
import qualified Pos.Infra.Diffusion.Types as Diffusion
3636
import Pos.Infra.Recovery.Types (RecoveryHeaderTag)
3737
import Pos.Infra.Reporting (reportOrLogE, reportOrLogW)
@@ -374,8 +374,11 @@ streamProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints
374374
_ <- dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId
375375
return ()
376376
where
377-
writeCallback :: (TVar (Maybe Block)) -> [Block] -> m ()
378-
writeCallback _ [] = return ()
379-
writeCallback mostDifficultBlock (block:blocks) = do
380-
_ <- atomically $ swapTVar mostDifficultBlock (Just block)
381-
handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ block :| blocks)) diffusion
377+
writeCallback :: TVar (Maybe Block) -> StreamBlocks Block m ()
378+
writeCallback mostDifficultBlock = StreamBlocks
379+
{ streamBlocksMore = \blks -> do
380+
_ <- atomically $ swapTVar mostDifficultBlock (Just (NE.head blks))
381+
_ <- handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ blks)) diffusion
382+
pure (writeCallback mostDifficultBlock)
383+
, streamBlocksDone = pure ()
384+
}

0 commit comments

Comments
 (0)