diff --git a/infra/src/Pos/Infra/Diffusion/Types.hs b/infra/src/Pos/Infra/Diffusion/Types.hs index f8207603591..9d08575f502 100644 --- a/infra/src/Pos/Infra/Diffusion/Types.hs +++ b/infra/src/Pos/Infra/Diffusion/Types.hs @@ -7,6 +7,8 @@ module Pos.Infra.Diffusion.Types , Diffusion (..) , hoistDiffusion , dummyDiffusionLayer + , StreamBlocks (..) + , hoistStreamBlocks , DiffusionHealth (..) ) where @@ -29,6 +31,24 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates, emptySubscriptionStates) import Pos.Infra.Reporting.Health.Types (HealthStatus (..)) +-- | How to handle a stream of blocks. +data StreamBlocks block m t = StreamBlocks + { streamBlocksMore :: NonEmpty block -> m (StreamBlocks block m t) + -- ^ The server gives a batch of blocks. + , streamBlocksDone :: m t + -- ^ The server has no more blocks. + } + +hoistStreamBlocks + :: ( Functor n ) + => (forall x . m x -> n x) + -> StreamBlocks block m t + -> StreamBlocks block n t +hoistStreamBlocks nat streamBlocks = streamBlocks + { streamBlocksMore = \blks -> + fmap (hoistStreamBlocks nat) (nat (streamBlocksMore streamBlocks blks)) + , streamBlocksDone = nat (streamBlocksDone streamBlocks) + } data DiffusionHealth = DiffusionHealth { dhStreamWriteQueue :: !Gauge -- Number of blocks stored in the block stream write queue @@ -49,7 +69,7 @@ data Diffusion m = Diffusion NodeId -> HeaderHash -> [HeaderHash] - -> ([Block] -> m t) + -> StreamBlocks Block m t -> m (Maybe t) -- | This is needed because there's a security worker which will request -- tip-of-chain from the network if it determines it's very far behind. @@ -108,7 +128,7 @@ hoistDiffusion -> Diffusion n hoistDiffusion nat rnat orig = Diffusion { getBlocks = \nid bh hs -> nat $ getBlocks orig nid bh hs - , streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (rnat . k) + , streamBlocks = \nid hh hhs k -> nat $ streamBlocks orig nid hh hhs (hoistStreamBlocks rnat k) , requestTip = nat $ (fmap . fmap) nat (requestTip orig) , announceBlockHeader = nat . announceBlockHeader orig , sendTx = nat . sendTx orig diff --git a/lib/bench/Bench/Pos/Diffusion/BlockDownload.hs b/lib/bench/Bench/Pos/Diffusion/BlockDownload.hs index d4b1a1f076f..3f95bb9df29 100644 --- a/lib/bench/Bench/Pos/Diffusion/BlockDownload.hs +++ b/lib/bench/Bench/Pos/Diffusion/BlockDownload.hs @@ -44,7 +44,8 @@ import Pos.Diffusion.Full (FullDiffusionConfiguration (..), FullDiffusionInternals (..), RunFullDiffusionInternals (..), diffusionLayerFullExposeInternals) -import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..)) +import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..), + StreamBlocks (..)) import qualified Pos.Infra.Network.Policy as Policy import Pos.Infra.Network.Types (Bucket (..)) import Pos.Infra.Reporting.Health.Types (HealthStatus (..)) @@ -226,7 +227,10 @@ blockDownloadStream serverAddress setStreamIORef client ~(blockHeader, checkpoin where numBlocks = batches * 2200 - writeCallback !_ = return () + writeCallback = StreamBlocks + { streamBlocksMore = \(!_) -> pure writeCallback + , streamBlocksDone = pure () + } blockDownloadBenchmarks :: NodeId -> (Int -> IO ()) -> Diffusion IO -> [Criterion.Benchmark] blockDownloadBenchmarks serverAddress setStreamIORef client = diff --git a/lib/src/Pos/Diffusion/Full.hs b/lib/src/Pos/Diffusion/Full.hs index 67e4f0629c3..d35b4137d53 100644 --- a/lib/src/Pos/Diffusion/Full.hs +++ b/lib/src/Pos/Diffusion/Full.hs @@ -68,7 +68,8 @@ import Pos.Infra.Diffusion.Subscription.Status (SubscriptionStates, emptySubscriptionStates) import Pos.Infra.Diffusion.Transport.TCP (bracketTransportTCP) import Pos.Infra.Diffusion.Types (Diffusion (..), - DiffusionHealth (..), DiffusionLayer (..)) + DiffusionHealth (..), DiffusionLayer (..), + StreamBlocks (..)) import Pos.Infra.Network.Types (Bucket (..), NetworkConfig (..), NodeType, SubscriptionWorker (..), initQueue, topologyHealthStatus, topologyRunKademlia, @@ -358,7 +359,7 @@ diffusionLayerFullExposeInternals fdconf NodeId -> HeaderHash -> [HeaderHash] - -> ([Block] -> IO t) + -> StreamBlocks Block IO t -> IO (Maybe t) streamBlocks = Diffusion.Block.streamBlocks logTrace diffusionHealth logic streamWindow enqueue diff --git a/lib/src/Pos/Diffusion/Full/Block.hs b/lib/src/Pos/Diffusion/Full/Block.hs index 13ff033409a..b24b374b5a8 100644 --- a/lib/src/Pos/Diffusion/Full/Block.hs +++ b/lib/src/Pos/Diffusion/Full/Block.hs @@ -14,8 +14,7 @@ module Pos.Diffusion.Full.Block import Universum -import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (cancel) +import qualified Control.Concurrent.Async as Async import qualified Control.Concurrent.STM as Conc import Control.Exception (Exception (..), throwIO) import Control.Lens (to) @@ -55,7 +54,8 @@ import Pos.Infra.Communication.Protocol (Conversation (..), MkListeners (..), MsgType (..), NodeId, Origin (..), OutSpecs, constantListeners, recvLimited, waitForConversations, waitForDequeues) -import Pos.Infra.Diffusion.Types (DiffusionHealth (..)) +import Pos.Infra.Diffusion.Types (DiffusionHealth (..), + StreamBlocks (..)) import Pos.Infra.Network.Types (Bucket) import Pos.Infra.Util.TimeWarp (nodeIdToAddress) import Pos.Logic.Types (Logic) @@ -281,78 +281,28 @@ getBlocks logTrace logic recoveryHeadersMessage enqueue nodeId tipHeaderHash che data StreamEntry = StreamEnd | StreamBlock !Block -- | Stream some blocks from the network. --- Returns Nothing if streaming is disabled by the client or not supported by the peer. +-- If streaming is not supported by the client or peer, you get 'Nothing'. We +-- don't fall back to batching because we can't: that method requires having +-- all of the header hashes for the blocks you desire. streamBlocks :: forall t . Trace IO (Severity, Text) -> Maybe DiffusionHealth -> Logic IO - -> Word32 + -> Word32 -- ^ Size of stream window. 0 implies 'Nothing' is returned. -> EnqueueMsg -> NodeId -> HeaderHash -> [HeaderHash] - -> ([Block] -> IO t) + -> StreamBlocks Block IO t -> IO (Maybe t) -streamBlocks _ _ _ 0 _ _ _ _ _ = return Nothing -- Fallback to batch mode -streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints k = do - blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow - let batchSize = min 64 streamWindow - fallBack <- atomically $ Conc.newTVar False - requestVar <- requestBlocks fallBack blockChan - r <- processBlocks batchSize 0 [] blockChan `finally` (atomically $ do - status <- Conc.readTVar requestVar - case status of - OQ.PacketAborted -> pure (pure ()) - OQ.PacketEnqueued -> do - Conc.writeTVar requestVar OQ.PacketAborted - pure (pure ()) - OQ.PacketDequeued asyncIO -> pure (cancel asyncIO)) - r' <- atomically $ Conc.readTVar fallBack - if r' then pure Nothing - else pure $ Just r +streamBlocks _ _ _ 0 _ _ _ _ _ = + return Nothing -- Fallback to batch mode +streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoints streamBlocksK = + requestBlocks >>= Async.wait where - processBlocks :: Word32 -> Word32 -> [Block] -> Conc.TBQueue StreamEntry -> IO t - processBlocks batchSize !n !blocks blockChan = do - streamEntry <- atomically $ Conc.readTBQueue blockChan - case streamEntry of - StreamEnd -> k blocks - StreamBlock block -> do - let n' = n + 1 - when (n' `mod` 256 == 0) $ - traceWith logTrace (Debug, - sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block) - (block ^. difficultyL)) - case smM of - Nothing -> pure () - Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm - - if n' `mod` batchSize == 0 - then do - _ <- k (block : blocks) - processBlocks batchSize n' [] blockChan - else - processBlocks batchSize n' (block : blocks) blockChan - - writeStreamEnd :: Conc.TBQueue StreamEntry -> IO () - writeStreamEnd blockChan = writeBlock 1024 blockChan StreamEnd - - -- It is possible that the reader of the TBQueue stops unexpectedly which - -- means that we we will have to use a timeout instead of blocking forever - -- while attempting to write to a full queue. - writeBlock :: Int -> Conc.TBQueue StreamEntry -> StreamEntry -> IO () - writeBlock delay _ _ | delay >= 4000000 = do - let msg = "Error write timeout to local reader" - traceWith logTrace (Warning, msg) - throwM $ DialogUnexpected msg - writeBlock delay blockChan b = do - isFull <- atomically $ Conc.isFullTBQueue blockChan - if isFull - then do - threadDelay delay - writeBlock (delay * 2) blockChan b - else atomically $ Conc.writeTBQueue blockChan b + batchSize = min 64 streamWindow mkStreamStart :: [HeaderHash] -> HeaderHash -> MsgStream mkStreamStart chain wantedBlock = @@ -362,39 +312,45 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint , mssWindow = streamWindow } - requestBlocks :: Conc.TVar Bool -> Conc.TBQueue StreamEntry -> IO (Conc.TVar (OQ.PacketStatus ())) - requestBlocks fallBack blockChan = do - convMap <- enqueue (MsgRequestBlocks (S.singleton nodeId)) - (\_ _ -> (Conversation $ \it -> requestBlocksConversation blockChan it `onException` writeStreamEnd blockChan) :| - [(Conversation $ \it -> requestBatch fallBack blockChan it `finally` writeStreamEnd blockChan)] - ) + -- Enqueue a conversation which will attempt to stream. + -- This returns when the conversation is dequeued, or throws an exception + -- in case it's aborted or is not enqueued. + requestBlocks :: IO (Async.Async (Maybe t)) + requestBlocks = do + convMap <- enqueue + (MsgRequestBlocks (S.singleton nodeId)) + (\_ _ -> (Conversation $ streamBlocksConversation) :| + [(Conversation $ batchConversation)] + ) + -- Outbound queue guarantees that the map is either size 0 or 1, since + -- 'S.singleton nodeId' was given to the enqueue. case M.lookup nodeId convMap of - Just tvar -> pure tvar + Just tvar -> atomically $ do + pStatus <- Conc.readTVar tvar + case pStatus of + OQ.PacketEnqueued -> Conc.retry + -- Somebody else arborted our call; nothing to do but + -- throw. + OQ.PacketAborted -> Conc.throwSTM $ DialogUnexpected $ "streamBlocks: aborted" + OQ.PacketDequeued streamThread -> pure streamThread -- FIXME shouldn't have to deal with this. -- One possible solution: do the block request in response to an -- unsolicited header, so that's it's all done in one conversation, -- and so there's no need to even track the 'nodeId'. - Nothing -> throwM $ DialogUnexpected $ "requestBlocks did not contact given peer" - - requestBatch - :: Conc.TVar Bool - -> Conc.TBQueue StreamEntry - -> ConversationActions MsgGetBlocks MsgBlock - -> IO () - requestBatch fallBack _ _ = do - -- The peer doesn't support streaming, we need to fall back to batching but - -- the current conversation is unusable since there is no way for us to learn - -- which blocks we shall fetch. - -- We will always have room to write a singel StreamEnd so there is no need to - -- differentiate between normal execution and when we get an expection. - atomically $ writeTVar fallBack True - return () - - requestBlocksConversation - :: Conc.TBQueue StreamEntry - -> ConversationActions MsgStream MsgStreamBlock - -> IO () - requestBlocksConversation blockChan conv = do + Nothing -> throwIO $ DialogUnexpected $ "streamBlocks: did not contact given peer" + + -- The peer doesn't support streaming, we need to fall back to batching but + -- the current conversation is unusable since there is no way for us to learn + -- which blocks we shall fetch. + batchConversation + :: ConversationActions MsgGetBlocks MsgBlock + -> IO (Maybe t) + batchConversation _ = pure Nothing + + streamBlocksConversation + :: ConversationActions MsgStream MsgStreamBlock + -> IO (Maybe t) + streamBlocksConversation conv = do let newestHash = headerHash tipHeader traceWith logTrace (Debug, sformat ("streamBlocks: Requesting stream of blocks from "%listJson%" to "%shortHashF) @@ -402,9 +358,16 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint newestHash) send conv $ mkStreamStart checkpoints newestHash bvd <- Logic.getAdoptedBVData logic - retrieveBlocks bvd blockChan conv streamWindow - atomically $ Conc.writeTBQueue blockChan StreamEnd - return () + -- Two threads are used here: one to pull in blocks, and one to + -- call into the application 'StreamBlocks' value. The reason: + -- the latter could do a lot of work for each batch, so having another + -- thread continually pulling in with a buffer in the middle will + -- smooth the traffic. + blockChan <- atomically $ Conc.newTBQueue $ fromIntegral streamWindow + (_, b) <- Async.concurrently + (retrieveBlocks bvd blockChan conv streamWindow) + (processBlocks 0 [] blockChan streamBlocksK) + pure $ Just b halfStreamWindow = max 1 $ streamWindow `div` 2 @@ -426,15 +389,14 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint else return $ window - 1 block <- retrieveBlock bvd conv case block of - MsgStreamNoBlock t -> do - let msg = sformat ("MsgStreamNoBlock "%stext) t - traceWith logTrace (Warning, msg) + MsgStreamNoBlock e -> do + let msg = sformat ("MsgStreamNoBlock "%stext) e + traceWith logTrace (Error, msg) throwM $ DialogUnexpected msg MsgStreamEnd -> do + atomically $ Conc.writeTBQueue blockChan StreamEnd traceWith logTrace (Debug, sformat ("Streaming done client-side for node"%build) nodeId) - return () MsgStreamBlock b -> do - -- traceWith logTrace (Debug, sformat ("Read block "%shortHashF) (headerHash b)) atomically $ Conc.writeTBQueue blockChan (StreamBlock b) case smM of Nothing -> pure () @@ -452,10 +414,44 @@ streamBlocks logTrace smM logic streamWindow enqueue nodeId tipHeader checkpoint case blockE of Nothing -> do let msg = sformat ("Error retrieving blocks from peer "%build) nodeId - traceWith logTrace (Warning, msg) + traceWith logTrace (Error, msg) throwM $ DialogUnexpected msg Just block -> return block + processBlocks + :: Word32 + -> [Block] + -> Conc.TBQueue StreamEntry + -> StreamBlocks Block IO t + -> IO t + processBlocks n !blocks blockChan k = do + streamEntry <- atomically $ Conc.readTBQueue blockChan + case streamEntry of + StreamEnd -> case blocks of + [] -> streamBlocksDone k + (blk:blks) -> do + k' <- streamBlocksMore k (blk :| blks) + streamBlocksDone k' + StreamBlock block -> do + -- FIXME this logging stuff should go into the particular + -- 'StreamBlocks' value rather than here. + let n' = n + 1 + when (n' `mod` 256 == 0) $ + traceWith logTrace (Debug, + sformat ("Read block "%shortHashF%" difficulty "%int) (headerHash block) + (block ^. difficultyL)) + case smM of + Nothing -> pure () + Just sm -> liftIO $ Gauge.dec $ dhStreamWriteQueue sm + + if n' `mod` batchSize == 0 + then do + k' <- streamBlocksMore k (block :| blocks) + processBlocks n' [] blockChan k' + else + processBlocks n' (block : blocks) blockChan k + + requestTip :: Trace IO (Severity, Text) -> Logic IO diff --git a/lib/src/Pos/Network/Block/Retrieval.hs b/lib/src/Pos/Network/Block/Retrieval.hs index 394492a1d3e..2b6d7ba44c9 100644 --- a/lib/src/Pos/Network/Block/Retrieval.hs +++ b/lib/src/Pos/Network/Block/Retrieval.hs @@ -31,7 +31,7 @@ import Pos.DB.Block (ClassifyHeaderRes (..), classifyNewHeader, getHeadersOlderExp) import qualified Pos.DB.BlockIndex as DB import Pos.Infra.Communication.Protocol (NodeId) -import Pos.Infra.Diffusion.Types (Diffusion) +import Pos.Infra.Diffusion.Types (Diffusion, StreamBlocks (..)) import qualified Pos.Infra.Diffusion.Types as Diffusion import Pos.Infra.Recovery.Types (RecoveryHeaderTag) import Pos.Infra.Reporting (reportOrLogE, reportOrLogW) @@ -374,8 +374,11 @@ streamProcessBlocks genesisConfig txpConfig diffusion nodeId desired checkpoints _ <- dropRecoveryHeaderAndRepeat genesisConfig diffusion nodeId return () where - writeCallback :: (TVar (Maybe Block)) -> [Block] -> m () - writeCallback _ [] = return () - writeCallback mostDifficultBlock (block:blocks) = do - _ <- atomically $ swapTVar mostDifficultBlock (Just block) - handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ block :| blocks)) diffusion + writeCallback :: TVar (Maybe Block) -> StreamBlocks Block m () + writeCallback mostDifficultBlock = StreamBlocks + { streamBlocksMore = \blks -> do + _ <- atomically $ swapTVar mostDifficultBlock (Just (NE.head blks)) + _ <- handleBlocks genesisConfig txpConfig (OldestFirst (NE.reverse $ blks)) diffusion + pure (writeCallback mostDifficultBlock) + , streamBlocksDone = pure () + } diff --git a/lib/test/Test/Pos/Diffusion/BlockSpec.hs b/lib/test/Test/Pos/Diffusion/BlockSpec.hs index 71434d04995..ab40c726bc1 100644 --- a/lib/test/Test/Pos/Diffusion/BlockSpec.hs +++ b/lib/test/Test/Pos/Diffusion/BlockSpec.hs @@ -40,7 +40,8 @@ import Pos.Diffusion.Full (FullDiffusionConfiguration (..), FullDiffusionInternals (..), RunFullDiffusionInternals (..), diffusionLayerFullExposeInternals) -import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..)) +import Pos.Infra.Diffusion.Types as Diffusion (Diffusion (..), + StreamBlocks (..)) import qualified Pos.Infra.Network.Policy as Policy import Pos.Infra.Network.Types (Bucket (..)) import Pos.Infra.Reporting.Health.Types (HealthStatus (..)) @@ -213,15 +214,19 @@ blockDownloadStream :: NodeId -> IORef Bool -> IORef [Block] -> (Int -> IO ()) - blockDownloadStream serverAddress resultIORef streamIORef setStreamIORef ~(blockHeader, checkpoints) client = do setStreamIORef 1 recvIORef <- newIORef [] - _ <- Diffusion.streamBlocks client serverAddress blockHeader checkpoints (writeCallback recvIORef) + _ <- Diffusion.streamBlocks client serverAddress blockHeader checkpoints (streamBlocksK recvIORef) expectedBlocks <- readIORef streamIORef recvBlocks <- readIORef recvIORef writeIORef resultIORef $ expectedBlocks == reverse recvBlocks return () where - writeCallback recvBlocks !blocks = - modifyIORef' recvBlocks (\d -> blocks <> d) + streamBlocksK recvBlocks = StreamBlocks + { streamBlocksMore = \(!blocks) -> do + modifyIORef' recvBlocks (\d -> (NE.toList blocks) <> d) + pure (streamBlocksK recvBlocks) + , streamBlocksDone = pure () + } -- Generate a list of n+1 blocks generateBlocks :: ProtocolMagic -> Int -> NonEmpty Block diff --git a/networking/cardano-sl-networking.cabal b/networking/cardano-sl-networking.cabal index d364570e243..79744db20de 100644 --- a/networking/cardano-sl-networking.cabal +++ b/networking/cardano-sl-networking.cabal @@ -16,7 +16,6 @@ Library exposed-modules: Network.QDisc.Fair Network.Discovery.Abstract - Network.Discovery.Transport.Kademlia Network.Broadcast.OutboundQueue Network.Broadcast.OutboundQueue.Types @@ -55,7 +54,6 @@ Library , formatting , formatting , hashable - , kademlia , lens , mtl , mtl >= 2.2.1 @@ -82,28 +80,6 @@ Library OverloadedStrings MonadFailDesugaring -executable discovery - main-is: Discovery.hs - build-depends: base >= 4.8 && < 5 - , binary - , bytestring - , cardano-sl-networking - , cardano-sl-util - , containers - , contravariant - , network-transport - , network-transport-tcp - , random - - hs-source-dirs: examples - default-language: Haskell2010 - ghc-options: -threaded -Wall - default-extensions: DeriveDataTypeable - DeriveGeneric - GeneralizedNewtypeDeriving - OverloadedStrings - MonadFailDesugaring - executable ping-pong main-is: PingPong.hs build-depends: base >= 4.8 && < 5 diff --git a/networking/examples/Discovery.hs b/networking/examples/Discovery.hs deleted file mode 100644 index 10746e84c86..00000000000 --- a/networking/examples/Discovery.hs +++ /dev/null @@ -1,134 +0,0 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecursiveDo #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE TypeFamilies #-} - -module Main where - -import Control.Concurrent (ThreadId, forkIO, killThread, threadDelay) -import Control.Exception (finally, throwIO) -import Control.Monad (forM, forM_, when) -import Data.Binary -import qualified Data.ByteString as BS -import qualified Data.ByteString.Char8 as B8 -import Data.Functor.Contravariant (contramap) -import qualified Data.Set as S -import Data.Void (Void) -import GHC.Generics (Generic) -import Network.Discovery.Abstract -import qualified Network.Discovery.Transport.Kademlia as K -import Network.Transport (Transport (..)) -import qualified Network.Transport.TCP as TCP -import Node -import Node.Message.Binary (BinaryP, binaryPacking) -import Pos.Util.Trace (stdoutTrace) -import System.Environment (getArgs) -import System.Random - -data Pong = Pong BS.ByteString -deriving instance Generic Pong -deriving instance Show Pong -instance Binary Pong where - -type Packing = BinaryP - -worker - :: NodeId - -> StdGen - -> NetworkDiscovery K.KademliaDiscoveryErrorCode - -> Converse Packing BS.ByteString - -> IO () -worker anId generator discovery = pingWorker generator - where - pingWorker - :: StdGen - -> Converse Packing BS.ByteString - -> IO () - pingWorker gen converse = loop gen - where - loop g = do - let (us, gen') = randomR (1000,2000000) g - threadDelay us - _ <- knownPeers discovery - _ <- discoverPeers discovery - peerSet <- knownPeers discovery - putStrLn $ show anId ++ " has peer set: " ++ show peerSet - forM_ (S.toList peerSet) $ \addr -> converseWith converse (NodeId addr) $ - \_peerData -> Conversation $ \(cactions :: ConversationActions Void Pong) -> do - received <- recv cactions maxBound - case received of - Just (Pong _) -> putStrLn $ show anId ++ " heard PONG from " ++ show addr - Nothing -> error "Unexpected end of input" - loop gen' - -listeners - :: NodeId - -> BS.ByteString - -> [Listener Packing BS.ByteString] -listeners anId peerData = [pongListener] - where - pongListener :: Listener Packing BS.ByteString - pongListener = Listener $ \_ peerId (cactions :: ConversationActions Pong Void) -> do - putStrLn $ show anId ++ " heard PING from " ++ show peerId ++ " with peer data " ++ B8.unpack peerData - send cactions (Pong "") - -makeNode :: Transport - -> Int - -> IO ThreadId -makeNode transport i = do - let port = 3000 + i - host = "127.0.0.1" - addr = (host, fromIntegral port) - anId = makeId i - initialPeer = - if i == 0 - -- First node uses itself as initial peer, else it'll panic because - -- its initial peer appears to be down. - then K.Peer host (fromIntegral port) - else K.Peer host (fromIntegral (port - 1)) - kademliaConfig = K.KademliaConfiguration addr addr anId - prng1 = mkStdGen (2 * i) - prng2 = mkStdGen ((2 * i) + 1) - putStrLn $ "Starting node " ++ show i - forkIO $ node (contramap snd stdoutTrace) (simpleNodeEndPoint transport) (const noReceiveDelay) (const noReceiveDelay) - prng1 binaryPacking (B8.pack "my peer data!") defaultNodeEnvironment $ \node' -> - NodeAction (listeners . nodeId $ node') $ \converse -> do - putStrLn $ "Making discovery for node " ++ show i - discovery <- K.kademliaDiscovery kademliaConfig initialPeer (nodeEndPointAddress node') - worker (nodeId node') prng2 discovery converse `finally` closeDiscovery discovery - where - makeId anId - | anId < 10 = B8.pack ("node_identifier_0" ++ show anId) - | otherwise = B8.pack ("node_identifier_" ++ show anId) - -main :: IO () -main = do - - args <- getArgs - number <- case args of - [arg0] | Just number <- read arg0 -> return number - _ -> error "Input argument must be a number" - - when (number > 99 || number < 1) $ error "Give a number in [1,99]" - - let params = TCP.defaultTCPParameters { TCP.tcpCheckPeerHost = True } - transport <- do - transportOrError <- - TCP.createTransport (TCP.defaultTCPAddr "127.0.0.1" "10128") params - either throwIO return transportOrError - - putStrLn $ "Spawning " ++ show number ++ " nodes" - nodeThreads <- forM [0..number] (makeNode transport) - - putStrLn "Hit return to stop" - _ <- getChar - - putStrLn "Stopping nodes" - forM_ nodeThreads killThread - closeTransport transport diff --git a/networking/src/Network/Discovery/Transport/Kademlia.hs b/networking/src/Network/Discovery/Transport/Kademlia.hs deleted file mode 100644 index b3085a2a546..00000000000 --- a/networking/src/Network/Discovery/Transport/Kademlia.hs +++ /dev/null @@ -1,174 +0,0 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE ScopedTypeVariables #-} - -module Network.Discovery.Transport.Kademlia - ( K.Node (..) - , K.Peer (..) - , KademliaConfiguration (..) - , KademliaDiscoveryErrorCode (..) - , kademliaDiscovery - ) where - -import qualified Control.Concurrent.STM as STM -import qualified Control.Concurrent.STM.TVar as TVar -import Control.Monad (forM) -import Data.Binary (Binary, decodeOrFail, encode) -import qualified Data.ByteString.Lazy as BL -import qualified Data.Map.Strict as M -import qualified Data.Set as S -import Data.Typeable (Typeable) -import Data.Word (Word16) -import GHC.Generics (Generic) -import qualified Network.Kademlia as K - -import Network.Discovery.Abstract -import Network.Transport - --- | Wrapper which provides a 'K.Serialize' instance for any type with a --- 'Binary' instance. -newtype KSerialize i = KSerialize i - deriving (Eq, Ord, Show) - -instance Binary i => K.Serialize (KSerialize i) where - fromBS bs = case decodeOrFail (BL.fromStrict bs) of - Left (_, _, str) -> Left str - Right (unconsumed, _, i) -> Right (KSerialize i, BL.toStrict unconsumed) - toBS (KSerialize i) = BL.toStrict . encode $ i - --- | Configuration for a Kademlia node. -data KademliaConfiguration i = KademliaConfiguration { - kademliaBindAddress :: (String, Word16) - , kademliaExternalAddress :: (String, Word16) - , kademliaId :: i - -- ^ Some value to use as the identifier for this node. To use it, it must - -- have a 'Binary' instance. You may want to take a random value, and - -- it should serialize to something long enough for your expected - -- network size (every node in the network needs a unique id). - } - --- | Discovery peers using the Kademlia DHT. Nodes in this network will store --- their (assumed to be TCP transport) 'EndPointAddress'es and send them --- over the wire on request. NB there are two notions of ID here: the --- Kademlia IDs, and the 'EndPointAddress'es which are indexed by the former. --- --- Many side-effects here: a Kademlia instance is created, grabbing a UDP --- socket and using it to talk to a peer, storing data in the DHT once it has --- been joined. -kademliaDiscovery - :: forall i . - (Binary i, Ord i, Show i) - => KademliaConfiguration i - -> K.Peer - -- ^ A known peer, necessary in order to join the network. - -- If there are no other peers in the network, use this node's id. - -> EndPointAddress - -- ^ Local endpoint address. Will store it in the DHT. - -> IO (NetworkDiscovery KademliaDiscoveryErrorCode) -kademliaDiscovery configuration peer myAddress = do - let kid :: KSerialize i - kid = KSerialize (kademliaId configuration) - -- A Kademlia instance to do the DHT magic. - kademliaInst :: K.KademliaInstance (KSerialize i) (KSerialize EndPointAddress) - <- K.create (kademliaBindAddress configuration) - (kademliaExternalAddress configuration) kid - -- A TVar to cache the set of known peers at the last use of 'discoverPeers' - peersTVar :: TVar.TVar (M.Map (K.Node (KSerialize i)) EndPointAddress) - <- TVar.newTVarIO $ M.empty - let knownPeers' = fmap (S.fromList . M.elems) . TVar.readTVarIO $ peersTVar - let discoverPeers' = kademliaDiscoverPeers kademliaInst peersTVar - let close' = K.close kademliaInst - -- Join the network and store the local 'EndPointAddress'. - _ <- kademliaJoinAndUpdate kademliaInst peersTVar peer - K.store kademliaInst kid (KSerialize myAddress) - pure $ NetworkDiscovery knownPeers' discoverPeers' close' - --- | Join a Kademlia network (using a given known node address) and update the --- known peers cache. -kademliaJoinAndUpdate - :: forall i . - ( Binary i, Ord i ) - => K.KademliaInstance (KSerialize i) (KSerialize EndPointAddress) - -> TVar.TVar (M.Map (K.Node (KSerialize i)) EndPointAddress) - -> K.Peer - -> IO (Either (DiscoveryError KademliaDiscoveryErrorCode) (S.Set EndPointAddress)) -kademliaJoinAndUpdate kademliaInst peersTVar peer = do - result <- K.joinNetwork kademliaInst peer - case result of - K.NodeBanned -> pure $ Left (DiscoveryError KademliaNodeBanned "Node is banned by network") - K.IDClash -> pure $ Left (DiscoveryError KademliaIdClash "ID clash in network") - K.NodeDown -> pure $ Left (DiscoveryError KademliaInitialPeerDown "Initial peer is down") - -- [sic] - K.JoinSuccess -> do - peerList <- map fst <$> K.dumpPeers kademliaInst - -- We have the peers, but we do not have the 'EndPointAddress'es for - -- them. We must ask the network for them. - endPointAddresses <- fmap (M.mapMaybe id) (kademliaLookupEndPointAddresses kademliaInst M.empty peerList) - STM.atomically $ TVar.writeTVar peersTVar endPointAddresses - pure $ Right (S.fromList (M.elems endPointAddresses)) - --- | Update the known peers cache. --- --- FIXME: error reporting. Should perhaps give a list of all of the errors --- which occurred. -kademliaDiscoverPeers - :: forall i . - ( Binary i, Ord i ) - => K.KademliaInstance (KSerialize i) (KSerialize EndPointAddress) - -> TVar.TVar (M.Map (K.Node (KSerialize i)) EndPointAddress) - -> IO (Either (DiscoveryError KademliaDiscoveryErrorCode) (S.Set EndPointAddress)) -kademliaDiscoverPeers kademliaInst peersTVar = do - recordedPeers <- TVar.readTVarIO peersTVar - currentPeers <- map fst <$> K.dumpPeers kademliaInst - -- The idea is to always update the TVar to the set of nodes in allPeers, - -- but only lookup the addresses for nodes which are not in the recorded - -- set to begin with. - currentWithAddresses <- fmap (M.mapMaybe id) (kademliaLookupEndPointAddresses kademliaInst recordedPeers currentPeers) - STM.atomically $ TVar.writeTVar peersTVar currentWithAddresses - let new = currentWithAddresses `M.difference` recordedPeers - pure $ Right (S.fromList (M.elems new)) - --- | Look up the 'EndPointAddress's for a set of nodes. --- See 'kademliaLookupEndPointAddress' -kademliaLookupEndPointAddresses - :: forall i . - ( Binary i, Ord i ) - => K.KademliaInstance (KSerialize i) (KSerialize EndPointAddress) - -> M.Map (K.Node (KSerialize i)) EndPointAddress - -> [K.Node (KSerialize i)] - -> IO (M.Map (K.Node (KSerialize i)) (Maybe EndPointAddress)) -kademliaLookupEndPointAddresses kademliaInst recordedPeers currentPeers = do - -- TODO do this in parallel, as each one may induce a blocking lookup. - endPointAddresses <- forM currentPeers (kademliaLookupEndPointAddress kademliaInst recordedPeers) - let assoc :: [(K.Node (KSerialize i), Maybe EndPointAddress)] - assoc = zip currentPeers endPointAddresses - pure $ M.fromList assoc - --- | Look up the 'EndPointAddress' for a given node. The host and port of --- the node are known, along with its Kademlia identifier, but the --- 'EndPointAddress' cannot be inferred from these things. The DHT stores --- that 'EndPointAddress' using the node's Kademlia identifier as key, so --- we look that up in the table. Nodes for which the 'EndPointAddress' is --- already known are not looked up. -kademliaLookupEndPointAddress - :: forall i . - ( Binary i, Ord i ) - => K.KademliaInstance (KSerialize i) (KSerialize EndPointAddress) - -> M.Map (K.Node (KSerialize i)) EndPointAddress - -- ^ The current set of recorded peers. We don't lookup an 'EndPointAddress' - -- for any of these, we just use the one in the map. - -> K.Node (KSerialize i) - -> IO (Maybe EndPointAddress) -kademliaLookupEndPointAddress kademliaInst recordedPeers peer@(K.Node _ nid) = - case M.lookup peer recordedPeers of - Nothing -> do - outcome <- K.lookup kademliaInst nid - pure $ case outcome of - Nothing -> Nothing - Just (KSerialize endPointAddress, _) -> Just endPointAddress - Just address' -> pure (Just address') - -data KademliaDiscoveryErrorCode - = KademliaIdClash - | KademliaInitialPeerDown - | KademliaNodeBanned - deriving (Show, Typeable, Generic) diff --git a/networking/src/Ntp/Util.hs b/networking/src/Ntp/Util.hs index 5404a41c981..e2fa7553113 100644 --- a/networking/src/Ntp/Util.hs +++ b/networking/src/Ntp/Util.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} @@ -44,10 +45,9 @@ import Data.These (These (..)) import Formatting (sformat, shown, (%)) import Network.Socket (AddrInfo, AddrInfoFlag (AI_ADDRCONFIG, AI_PASSIVE), - Family (AF_INET, AF_INET6), PortNumber (..), - SockAddr (..), Socket, SocketOption (ReuseAddr), - SocketType (Datagram), aNY_PORT, addrAddress, addrFamily, - addrFlags, addrSocketType) + Family (AF_INET, AF_INET6), PortNumber, SockAddr (..), + Socket, SocketOption (ReuseAddr), SocketType (Datagram), + addrAddress, addrFamily, addrFlags, addrSocketType) import qualified Network.Socket as Socket import qualified Network.Socket.ByteString as Socket.ByteString (sendTo) @@ -219,8 +219,13 @@ udpLocalAddresses = do let hints = Socket.defaultHints { addrFlags = [AI_PASSIVE] , addrSocketType = Datagram } +#if MIN_VERSION_network(2,8,0) + port = Socket.defaultPort +#else + port = Socket.aNY_PORT +#endif -- Hints Host Service - Socket.getAddrInfo (Just hints) Nothing (Just $ show aNY_PORT) + Socket.getAddrInfo (Just hints) Nothing (Just $ show port) data SendToException = NoMatchingSocket diff --git a/pkgs/default.nix b/pkgs/default.nix index 73fc6461959..56aba26f41a 100644 --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -17120,7 +17120,6 @@ license = stdenv.lib.licenses.mit; , hashable , hspec , hspec-core -, kademlia , lens , mtl , mwc-random @@ -17171,7 +17170,6 @@ containers ekg-core formatting hashable -kademlia lens mtl network @@ -17194,7 +17192,6 @@ base binary bytestring cardano-sl-util -containers contravariant network-transport network-transport-tcp