Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

[CDEC-658] port to release/2.0.1 #3998

Merged
merged 4 commits into from
Jan 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions infra/src/Pos/Infra/Diffusion/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module Pos.Infra.Diffusion.Types
, Diffusion (..)
, hoistDiffusion
, dummyDiffusionLayer
, StreamBlocks (..)
, hoistStreamBlocks
, DiffusionHealth (..)
) where

Expand All @@ -29,6 +31,24 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates,
emptySubscriptionStates)
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))

-- | How to handle a stream of blocks.
data StreamBlocks block m t = StreamBlocks
{ streamBlocksMore :: NonEmpty block -> m (StreamBlocks block m t)
-- ^ The server gives a batch of blocks.
, streamBlocksDone :: m t
-- ^ The server has no more blocks.
}

hoistStreamBlocks
:: ( Functor n )
=> (forall x . m x -> n x)
-> StreamBlocks block m t
-> StreamBlocks block n t
hoistStreamBlocks nat streamBlocks = streamBlocks
{ streamBlocksMore = \blks ->
fmap (hoistStreamBlocks nat) (nat (streamBlocksMore streamBlocks blks))
, streamBlocksDone = nat (streamBlocksDone streamBlocks)
}

data DiffusionHealth = DiffusionHealth {
dhStreamWriteQueue :: !Gauge -- Number of blocks stored in the block stream write queue
Expand All @@ -49,7 +69,7 @@ data Diffusion m = Diffusion
NodeId
-> HeaderHash
-> [HeaderHash]
-> ([Block] -> m t)
-> StreamBlocks Block m t
-> m (Maybe t)
-- | This is needed because there's a security worker which will request
-- tip-of-chain from the network if it determines it's very far behind.
Expand Down Expand Up @@ -108,7 +128,7 @@ hoistDiffusion
-> Diffusion n
hoistDiffusion nat rnat orig = Diffusion
{ getBlocks = \nid bh hs -> nat $ getBlocks orig nid bh hs
, streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (rnat . k)
, streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (hoistStreamBlocks rnat k)
, requestTip = nat $ (fmap . fmap) nat (requestTip orig)
, announceBlockHeader = nat . announceBlockHeader orig
, sendTx = nat . sendTx orig
Expand Down
8 changes: 6 additions & 2 deletions lib/bench/Bench/Pos/Diffusion/BlockDownload.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import Pos.Diffusion.Full (FullDiffusionConfiguration (..),
FullDiffusionInternals (..),
RunFullDiffusionInternals (..),
diffusionLayerFullExposeInternals)
import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..))
import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..),
StreamBlocks (..))
import qualified Pos.Infra.Network.Policy as Policy
import Pos.Infra.Network.Types (Bucket (..))
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
Expand Down Expand Up @@ -226,7 +227,10 @@ blockDownloadStream serverAddress setStreamIORef client ~(blockHeader, checkpoin
where
numBlocks = batches * 2200

writeCallback !_ = return ()
writeCallback = StreamBlocks
{ streamBlocksMore = \(!_) -> pure writeCallback
, streamBlocksDone = pure ()
}

blockDownloadBenchmarks :: NodeId -> (Int -> IO ()) -> Diffusion IO -> [Criterion.Benchmark]
blockDownloadBenchmarks serverAddress setStreamIORef client =
Expand Down
5 changes: 3 additions & 2 deletions lib/src/Pos/Diffusion/Full.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates,
emptySubscriptionStates)
import Pos.Infra.Diffusion.Transport.TCP (bracketTransportTCP)
import Pos.Infra.Diffusion.Types (Diffusion (..),
DiffusionHealth (..), DiffusionLayer (..))
DiffusionHealth (..), DiffusionLayer (..),
StreamBlocks (..))
import Pos.Infra.Network.Types (Bucket (..), NetworkConfig (..),
NodeType, SubscriptionWorker (..), initQueue,
topologyHealthStatus, topologyRunKademlia,
Expand Down Expand Up @@ -358,7 +359,7 @@ diffusionLayerFullExposeInternals fdconf
NodeId
-> HeaderHash
-> [HeaderHash]
-> ([Block] -> IO t)
-> StreamBlocks Block IO t
-> IO (Maybe t)
streamBlocks = Diffusion.Block.streamBlocks logTrace diffusionHealth logic streamWindow enqueue

Expand Down
196 changes: 96 additions & 100 deletions lib/src/Pos/Diffusion/Full/Block.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ module Pos.Diffusion.Full.Block

import Universum

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (cancel)
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as Conc
import Control.Exception (Exception (..), throwIO)
import Control.Lens (to)
Expand Down Expand Up @@ -55,7 +54,8 @@ import Pos.Infra.Communication.Protocol (Conversation (..),
MkListeners (..), MsgType (..), NodeId, Origin (..),
OutSpecs, constantListeners, recvLimited,
waitForConversations, waitForDequeues)
import Pos.Infra.Diffusion.Types (DiffusionHealth (..))
import Pos.Infra.Diffusion.Types (DiffusionHealth (..),
StreamBlocks (..))
import Pos.Infra.Network.Types (Bucket)
import Pos.Infra.Util.TimeWarp (nodeIdToAddress)
import Pos.Logic.Types (Logic)
Expand Down Expand Up @@ -281,78 +281,28 @@ getBlocks logTrace logic recoveryHeadersMessage enqueue nodeId tipHeaderHash che
data StreamEntry = StreamEnd | StreamBlock !Block

-- | Stream some blocks from the network.
-- Returns Nothing if streaming is disabled by the client or not supported by the peer.
-- If streaming is not supported by the client or peer, you get 'Nothing'. We
-- don't fall back to batching because we can't: that method requires having
-- all of the header hashes for the blocks you desire.
streamBlocks
:: forall t .
Trace IO (Severity, Text)
-> Maybe DiffusionHealth
-> Logic IO
-> Word32
-> Word32 -- ^ Size of stream window. 0 implies 'Nothing' is returned.
-> EnqueueMsg
-> NodeId
-> HeaderHash
-> [HeaderHash]
-> ([Block] -> IO t)
-> StreamBlocks Block IO t
-> IO (Maybe t)
streamBlocks _ _ _ 0 _ _ _ _ _ = return Nothing -- Fallback to batch mode
streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints k = do
blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow
let batchSize = min 64 streamWindow
fallBack <- atomically $ Conc.newTVar False
requestVar <- requestBlocks fallBack blockChan
r <- processBlocks batchSize 0 [] blockChan `finally` (atomically $ do
status <- Conc.readTVar requestVar
case status of
OQ.PacketAborted -> pure (pure ())
OQ.PacketEnqueued -> do
Conc.writeTVar requestVar OQ.PacketAborted
pure (pure ())
OQ.PacketDequeued asyncIO -> pure (cancel asyncIO))
r' <- atomically $ Conc.readTVar fallBack
if r' then pure Nothing
else pure $ Just r
streamBlocks _ _ _ 0 _ _ _ _ _ =
return Nothing -- Fallback to batch mode
streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints streamBlocksK =
requestBlocks >>= Async.wait
where

processBlocks :: Word32 -> Word32 -> [Block] -> Conc.TBQueue StreamEntry -> IO t
processBlocks batchSize !n !blocks blockChan = do
streamEntry <- atomically $ Conc.readTBQueue blockChan
case streamEntry of
StreamEnd -> k blocks
StreamBlock block -> do
let n' = n + 1
when (n' `mod` 256 == 0) $
traceWith logTrace (Debug,
sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block)
(block ^. difficultyL))
case smM of
Nothing -> pure ()
Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm

if n' `mod` batchSize == 0
then do
_ <- k (block : blocks)
processBlocks batchSize n' [] blockChan
else
processBlocks batchSize n' (block : blocks) blockChan

writeStreamEnd :: Conc.TBQueue StreamEntry -> IO ()
writeStreamEnd blockChan = writeBlock 1024 blockChan StreamEnd

-- It is possible that the reader of the TBQueue stops unexpectedly which
-- means that we we will have to use a timeout instead of blocking forever
-- while attempting to write to a full queue.
writeBlock :: Int -> Conc.TBQueue StreamEntry -> StreamEntry -> IO ()
writeBlock delay _ _ | delay >= 4000000 = do
let msg = "Error write timeout to local reader"
traceWith logTrace (Warning, msg)
throwM $ DialogUnexpected msg
writeBlock delay blockChan b = do
isFull <- atomically $ Conc.isFullTBQueue blockChan
if isFull
then do
threadDelay delay
writeBlock (delay * 2) blockChan b
else atomically $ Conc.writeTBQueue blockChan b
batchSize = min 64 streamWindow

mkStreamStart :: [HeaderHash] -> HeaderHash -> MsgStream
mkStreamStart chain wantedBlock =
Expand All @@ -362,49 +312,62 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
, mssWindow = streamWindow
}

requestBlocks :: Conc.TVar Bool -> Conc.TBQueue StreamEntry -> IO (Conc.TVar (OQ.PacketStatus ()))
requestBlocks fallBack blockChan = do
convMap <- enqueue (MsgRequestBlocks (S.singleton nodeId))
(\_ _ -> (Conversation $ \it -> requestBlocksConversation blockChan it `onException` writeStreamEnd blockChan) :|
[(Conversation $ \it -> requestBatch fallBack blockChan it `finally` writeStreamEnd blockChan)]
)
-- Enqueue a conversation which will attempt to stream.
-- This returns when the conversation is dequeued, or throws an exception
-- in case it's aborted or is not enqueued.
requestBlocks :: IO (Async.Async (Maybe t))
requestBlocks = do
convMap <- enqueue
(MsgRequestBlocks (S.singleton nodeId))
(\_ _ -> (Conversation $ streamBlocksConversation) :|
[(Conversation $ batchConversation)]
)
-- Outbound queue guarantees that the map is either size 0 or 1, since
-- 'S.singleton nodeId' was given to the enqueue.
case M.lookup nodeId convMap of
Just tvar -> pure tvar
Just tvar -> atomically $ do
pStatus <- Conc.readTVar tvar
case pStatus of
OQ.PacketEnqueued -> Conc.retry
-- Somebody else arborted our call; nothing to do but
-- throw.
OQ.PacketAborted -> Conc.throwSTM $ DialogUnexpected $ "streamBlocks: aborted"
OQ.PacketDequeued streamThread -> pure streamThread
-- FIXME shouldn't have to deal with this.
-- One possible solution: do the block request in response to an
-- unsolicited header, so that's it's all done in one conversation,
-- and so there's no need to even track the 'nodeId'.
Nothing -> throwM $ DialogUnexpected $ "requestBlocks did not contact given peer"

requestBatch
:: Conc.TVar Bool
-> Conc.TBQueue StreamEntry
-> ConversationActions MsgGetBlocks MsgBlock
-> IO ()
requestBatch fallBack _ _ = do
-- The peer doesn't support streaming, we need to fall back to batching but
-- the current conversation is unusable since there is no way for us to learn
-- which blocks we shall fetch.
-- We will always have room to write a singel StreamEnd so there is no need to
-- differentiate between normal execution and when we get an expection.
atomically $ writeTVar fallBack True
return ()

requestBlocksConversation
:: Conc.TBQueue StreamEntry
-> ConversationActions MsgStream MsgStreamBlock
-> IO ()
requestBlocksConversation blockChan conv = do
Nothing -> throwIO $ DialogUnexpected $ "streamBlocks: did not contact given peer"

-- The peer doesn't support streaming, we need to fall back to batching but
-- the current conversation is unusable since there is no way for us to learn
-- which blocks we shall fetch.
batchConversation
:: ConversationActions MsgGetBlocks MsgBlock
-> IO (Maybe t)
batchConversation _ = pure Nothing

streamBlocksConversation
:: ConversationActions MsgStream MsgStreamBlock
-> IO (Maybe t)
streamBlocksConversation conv = do
let newestHash = headerHash tipHeader
traceWith logTrace (Debug,
sformat ("streamBlocks: Requesting stream of blocks from "%listJson%" to "%shortHashF)
checkpoints
newestHash)
send conv $ mkStreamStart checkpoints newestHash
bvd <- Logic.getAdoptedBVData logic
retrieveBlocks bvd blockChan conv streamWindow
atomically $ Conc.writeTBQueue blockChan StreamEnd
return ()
-- Two threads are used here: one to pull in blocks, and one to
-- call into the application 'StreamBlocks' value. The reason:
-- the latter could do a lot of work for each batch, so having another
-- thread continually pulling in with a buffer in the middle will
-- smooth the traffic.
blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow
(_, b) <- Async.concurrently
(retrieveBlocks bvd blockChan conv streamWindow)
(processBlocks 0 [] blockChan streamBlocksK)
pure $ Just b

halfStreamWindow = max 1 $ streamWindow `div` 2

Expand All @@ -426,15 +389,14 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
else return $ window - 1
block <- retrieveBlock bvd conv
case block of
MsgStreamNoBlock t -> do
let msg = sformat ("MsgStreamNoBlock "%stext) t
traceWith logTrace (Warning, msg)
MsgStreamNoBlock e -> do
let msg = sformat ("MsgStreamNoBlock "%stext) e
traceWith logTrace (Error, msg)
throwM $ DialogUnexpected msg
MsgStreamEnd -> do
atomically $ Conc.writeTBQueue blockChan StreamEnd
traceWith logTrace (Debug, sformat ("Streaming done client-side for node"%build) nodeId)
return ()
MsgStreamBlock b -> do
-- traceWith logTrace (Debug, sformat ("Read block "%shortHashF) (headerHash b))
atomically $ Conc.writeTBQueue blockChan (StreamBlock b)
case smM of
Nothing -> pure ()
Expand All @@ -452,10 +414,44 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint
case blockE of
Nothing -> do
let msg = sformat ("Error retrieving blocks from peer "%build) nodeId
traceWith logTrace (Warning, msg)
traceWith logTrace (Error, msg)
throwM $ DialogUnexpected msg
Just block -> return block

processBlocks
:: Word32
-> [Block]
-> Conc.TBQueue StreamEntry
-> StreamBlocks Block IO t
-> IO t
processBlocks n !blocks blockChan k = do
streamEntry <- atomically $ Conc.readTBQueue blockChan
case streamEntry of
StreamEnd -> case blocks of
[] -> streamBlocksDone k
(blk:blks) -> do
k' <- streamBlocksMore k (blk :| blks)
streamBlocksDone k'
StreamBlock block -> do
-- FIXME this logging stuff should go into the particular
-- 'StreamBlocks' value rather than here.
let n' = n + 1
when (n' `mod` 256 == 0) $
traceWith logTrace (Debug,
sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block)
(block ^. difficultyL))
case smM of
Nothing -> pure ()
Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm

if n' `mod` batchSize == 0
then do
k' <- streamBlocksMore k (block :| blocks)
processBlocks n' [] blockChan k'
else
processBlocks n' (block : blocks) blockChan k


requestTip
:: Trace IO (Severity, Text)
-> Logic IO
Expand Down
15 changes: 9 additions & 6 deletions lib/src/Pos/Network/Block/Retrieval.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import Pos.DB.Block (ClassifyHeaderRes (..), classifyNewHeader,
getHeadersOlderExp)
import qualified Pos.DB.BlockIndex as DB
import Pos.Infra.Communication.Protocol (NodeId)
import Pos.Infra.Diffusion.Types (Diffusion)
import Pos.Infra.Diffusion.Types (Diffusion, StreamBlocks (..))
import qualified Pos.Infra.Diffusion.Types as Diffusion
import Pos.Infra.Recovery.Types (RecoveryHeaderTag)
import Pos.Infra.Reporting (reportOrLogE, reportOrLogW)
Expand Down Expand Up @@ -374,8 +374,11 @@ streamProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints
_ <- dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId
return ()
where
writeCallback :: (TVar (Maybe Block)) -> [Block] -> m ()
writeCallback _ [] = return ()
writeCallback mostDifficultBlock (block:blocks) = do
_ <- atomically $ swapTVar mostDifficultBlock (Just block)
handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ block :| blocks)) diffusion
writeCallback :: TVar (Maybe Block) -> StreamBlocks Block m ()
writeCallback mostDifficultBlock = StreamBlocks
{ streamBlocksMore = \blks -> do
_ <- atomically $ swapTVar mostDifficultBlock (Just (NE.head blks))
_ <- handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ blks)) diffusion
pure (writeCallback mostDifficultBlock)
, streamBlocksDone = pure ()
}
Loading