Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit d1f9688

Browse files
author
Michael Hueschen
committed
[CDEC-464] Switch uses of pipes to conduit
I did not replace uses of `pipes` in the `post-mortem` executable from `cardano-sl-tools`, since it does not build at present.
1 parent 658af4f commit d1f9688

File tree

10 files changed

+36
-39
lines changed

10 files changed

+36
-39
lines changed

db/cardano-sl-db.cabal

-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ library
143143
, memory
144144
, mmorph
145145
, mtl
146-
, pipes
147146
, reflection
148147
, resourcet
149148
, rocksdb-haskell-ng

db/src/Pos/DB/Block/GState/BlockExtra.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ module Pos.DB.Block.GState.BlockExtra
2020

2121
import Universum hiding (init)
2222

23+
import Data.Conduit (ConduitT, yield)
2324
import qualified Database.RocksDB as Rocks
2425
import Formatting (bprint, build, (%))
2526
import qualified Formatting.Buildable
26-
import Pipes (Producer, yield)
2727
import Serokell.Util.Text (listJson)
2828

2929
import Pos.Binary.Class (serialize')
@@ -122,7 +122,7 @@ streamBlocks
122122
=> (HeaderHash -> m (Maybe SerializedBlock))
123123
-> (HeaderHash -> m (Maybe HeaderHash))
124124
-> HeaderHash
125-
-> Producer SerializedBlock m ()
125+
-> ConduitT () SerializedBlock m ()
126126
streamBlocks loadBlock forwardLink base = do
127127
mFirst <- lift $ forwardLink base
128128
maybe (pure ()) loop mFirst

infra/src/Pos/Infra/Diffusion/Subscription/Subscriber.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Pos.Infra.Diffusion.Subscription.Subscriber
1212
import Universum
1313

1414
-- | Generate subscription targets in some monad.
15-
-- TBD any value in using a streaming solution like pipes?
15+
-- TBD any value in using a streaming solution like conduit?
1616
newtype SubscriptionTarget m target = SubscriptionTarget
1717
{ getSubscriptionTarget :: m (Maybe (target, SubscriptionTarget m target))
1818
}

lib/bench/Bench/Pos/Diffusion/BlockDownload.hs

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@ import qualified Criterion
1919
import qualified Criterion.Main as Criterion
2020
import qualified Criterion.Main.Options as Criterion
2121
import qualified Data.ByteString.Lazy as LBS
22+
import Data.Conduit.Combinators (yieldMany)
23+
import Data.List.NonEmpty (NonEmpty ((:|)))
2224
import Data.Semigroup ((<>))
25+
import Data.Time.Units (Microsecond)
2326
import qualified Options.Applicative as Opt (execParser)
2427

25-
import Data.List.NonEmpty (NonEmpty ((:|)))
26-
import Data.Time.Units (Microsecond)
2728
import qualified Network.Broadcast.OutboundQueue as OQ
2829
import qualified Network.Broadcast.OutboundQueue.Types as OQ
2930
import Network.Transport (Transport)
3031
import qualified Network.Transport.TCP as TCP
3132
import Node (NodeId)
3233
import qualified Node
33-
import Pipes (each)
3434

3535
import Pos.Binary (serialize, serialize')
3636
import Pos.Core.Block (Block, BlockHeader, HeaderHash)
3737
import qualified Pos.Core.Block as Block (getBlockHeader)
38+
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
3839
import Pos.Core.ProtocolConstants (ProtocolConstants (..))
3940
import Pos.Core.Update (BlockVersion (..))
4041
import Pos.Crypto (ProtocolMagic (..))
@@ -52,9 +53,8 @@ import Pos.Infra.Network.Types (Bucket (..))
5253
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
5354
import Pos.Logic.Pure (pureLogic)
5455
import Pos.Logic.Types as Logic (Logic (..))
55-
56-
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
5756
import Pos.Util.Trace (noTrace, wlogTrace)
57+
5858
import Test.Pos.Chain.Block.Arbitrary.Generate (generateMainBlock)
5959

6060
-- TODO
@@ -121,7 +121,7 @@ serverLogic streamIORef arbitraryBlock arbitraryHashes arbitraryHeaders = pureLo
121121
, getTipHeader = pure (Block.getBlockHeader arbitraryBlock)
122122
, Logic.streamBlocks = \_ -> do
123123
bs <- readIORef streamIORef
124-
each $ map serializedBlock bs
124+
yieldMany $ map serializedBlock bs
125125
}
126126

127127
serializedBlock :: Block -> SerializedBlock

lib/cardano-sl.cabal

+2-3
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ library
183183
, network-transport
184184
, optparse-applicative
185185
, parsec
186-
, pipes
187186
, pvss
188187
, random
189188
, reflection
@@ -313,6 +312,7 @@ test-suite cardano-test
313312
, cardano-sl-networking
314313
, cardano-sl-util
315314
, cardano-sl-util-test
315+
, conduit
316316
, containers
317317
, cryptonite
318318
, data-default
@@ -326,7 +326,6 @@ test-suite cardano-test
326326
, log-warper
327327
, network-transport
328328
, network-transport-inmemory
329-
, pipes
330329
, pvss
331330
, random
332331
, reflection
@@ -400,13 +399,13 @@ benchmark cardano-bench-criterion
400399
, cardano-sl-networking
401400
, cardano-sl-util
402401
, cardano-sl-util-test
402+
, conduit
403403
, criterion
404404
, deepseq
405405
, formatting
406406
, network-transport
407407
, network-transport-tcp
408408
, optparse-applicative
409-
, pipes
410409
, time-units
411410
, universum >= 0.1.11
412411
default-language: Haskell2010

lib/src/Pos/Diffusion/Full/Block.hs

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import Control.Exception (Exception (..), throwIO)
1919
import Control.Lens (to)
2020
import Control.Monad.Except (ExceptT, runExceptT, throwError)
2121
import qualified Data.ByteString.Lazy as BSL
22+
import Data.Conduit (await, runConduit, (.|))
2223
import Data.List.NonEmpty (NonEmpty ((:|)))
2324
import qualified Data.List.NonEmpty as NE
2425
import qualified Data.Map as M
@@ -27,7 +28,6 @@ import Formatting (bprint, build, int, sformat, shown, stext, (%))
2728
import qualified Formatting.Buildable as B
2829
import qualified Network.Broadcast.OutboundQueue as OQ
2930
import Node.Conversation (sendRaw)
30-
import Pipes (await, runEffect, (>->))
3131
import Serokell.Util.Text (listJson)
3232
import qualified System.Metrics.Gauge as Gauge
3333

@@ -689,7 +689,7 @@ handleStreamStart logTrace logic oq = listenerConv logTrace oq $ \__ourVerInfo n
689689
Logic.streamBlocks logic lca
690690
lift $ send conv MsgStreamEnd
691691
consumer = loop nodeId conv window
692-
runEffect $ producer >-> consumer
692+
runConduit $ producer .| consumer
693693

694694
loop nodeId conv 0 = do
695695
lift $ traceWith logTrace (Debug, "handleStreamStart:loop waiting on window update")
@@ -703,10 +703,10 @@ handleStreamStart logTrace logic oq = listenerConv logTrace oq $ \__ourVerInfo n
703703
MsgUpdate u -> do
704704
lift $ traceWith logTrace (Debug, sformat ("handleStreamStart:loop new window "%shown%" from "%build) u nodeId)
705705
loop nodeId conv (msuWindow u)
706-
loop nodeId conv window = do
707-
b <- await
708-
lift $ sendRaw conv $ serializeMsgStreamBlock $ MsgSerializedBlock b
709-
loop nodeId conv (window - 1)
706+
loop nodeId conv window =
707+
whenJustM await $ \b -> do
708+
lift $ sendRaw conv $ serializeMsgStreamBlock $ MsgSerializedBlock b
709+
loop nodeId conv (window - 1)
710710

711711
----------------------------------------------------------------------------
712712
-- Header propagation

lib/src/Pos/Logic/Full.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ module Pos.Logic.Full
99
import Universum hiding (id)
1010

1111
import Control.Lens (at, to)
12+
import Data.Conduit (ConduitT)
1213
import qualified Data.HashMap.Strict as HM
1314
import Data.Tagged (Tagged (..), tagWith)
1415
import Formatting (build, sformat, (%))
15-
import Pipes (Producer)
1616
import System.Wlog (WithLogger, logDebug)
1717

1818
import Pos.Chain.Block (HasBlockConfiguration)
@@ -106,7 +106,7 @@ logicFull pm txpConfig ourStakeholderId securityParams jsonLogTx =
106106
getSerializedBlock :: HeaderHash -> m (Maybe SerializedBlock)
107107
getSerializedBlock = DB.dbGetSerBlock
108108

109-
streamBlocks :: HeaderHash -> Producer SerializedBlock m ()
109+
streamBlocks :: HeaderHash -> ConduitT () SerializedBlock m ()
110110
streamBlocks = Block.streamBlocks DB.dbGetSerBlock Block.resolveForwardLink
111111

112112
getTip :: m Block

lib/src/Pos/Logic/Types.hs

+8-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ module Pos.Logic.Types
1212

1313
import Universum
1414

15+
import Data.Conduit (ConduitT, transPipe)
1516
import Data.Default (def)
1617
import Data.Tagged (Tagged)
17-
import Pipes (Producer)
18-
import Pipes.Internal (unsafeHoist)
1918

2019
import Pos.Chain.Security (SecurityParams (..))
2120
import Pos.Chain.Ssc (MCCommitment, MCOpening, MCShares,
@@ -38,7 +37,7 @@ data Logic m = Logic
3837
ourStakeholderId :: StakeholderId
3938
-- | Get serialized block, perhaps from a database.
4039
, getSerializedBlock :: HeaderHash -> m (Maybe SerializedBlock)
41-
, streamBlocks :: HeaderHash -> Producer SerializedBlock m ()
40+
, streamBlocks :: HeaderHash -> ConduitT () SerializedBlock m ()
4241
-- | Get a block header.
4342
, getBlockHeader :: HeaderHash -> m (Maybe BlockHeader)
4443
-- TODO CSL-2089 use conduits in this and the following methods
@@ -102,11 +101,15 @@ data Logic m = Logic
102101
, securityParams :: SecurityParams
103102
}
104103

105-
-- | We have to hoist a pipes producer, so the Monad constraint arises.
104+
-- | The Monad constraint arises due to `transPipe` from Conduit.
105+
-- The transformation function `foo :: (forall x. m x -> n x)` must be a
106+
-- *monad morphism* and not just any natural transformation. This means,
107+
-- roughly, that `foo a >> foo b` should behave the same as `foo (a >> b)`.
108+
-- `foo = flip evalState 1`, for example, does not satisfy this requirement.
106109
hoistLogic :: Monad m => (forall x . m x -> n x) -> Logic m -> Logic n
107110
hoistLogic nat logic = logic
108111
{ getSerializedBlock = nat . getSerializedBlock logic
109-
, streamBlocks = unsafeHoist nat . streamBlocks logic
112+
, streamBlocks = transPipe nat . streamBlocks logic
110113
, getBlockHeader = nat . getBlockHeader logic
111114
, getHashesRange = \a b c -> nat (getHashesRange logic a b c)
112115
, getBlockHeaders = \a b c -> nat (getBlockHeaders logic a b c)

lib/test/Test/Pos/Diffusion/BlockSpec.hs

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,26 @@ import Universum
1010

1111
import Control.DeepSeq (force)
1212
import Control.Monad.IO.Class (liftIO)
13+
import Data.Bits
1314
import qualified Data.ByteString.Lazy as LBS
15+
import Data.Conduit.Combinators (yieldMany)
16+
import Data.List.NonEmpty (NonEmpty ((:|)))
17+
import qualified Data.List.NonEmpty as NE
1418
import Data.Semigroup ((<>))
1519
import Test.Hspec (Spec, describe, it, shouldBe)
1620

17-
import Data.Bits
18-
import Data.List.NonEmpty (NonEmpty ((:|)))
19-
import qualified Data.List.NonEmpty as NE
2021
import qualified Network.Broadcast.OutboundQueue as OQ
2122
import qualified Network.Broadcast.OutboundQueue.Types as OQ
2223
import Network.Transport (Transport, closeTransport)
2324
import qualified Network.Transport.InMemory as InMemory
2425
import Node (NodeId)
2526
import qualified Node
26-
import Pipes (each)
2727

2828
import Pos.Binary.Class (serialize')
2929
import Pos.Core.Block (Block, BlockHeader, HeaderHash,
3030
blockHeaderHash)
3131
import qualified Pos.Core.Block as Block (getBlockHeader)
32+
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
3233
import Pos.Core.ProtocolConstants (ProtocolConstants (..))
3334
import Pos.Core.Update (BlockVersion (..))
3435
import Pos.Crypto (ProtocolMagic (..))
@@ -44,9 +45,8 @@ import Pos.Infra.Network.Types (Bucket (..))
4445
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
4546
import Pos.Logic.Pure (pureLogic)
4647
import Pos.Logic.Types as Logic (Logic (..))
47-
48-
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
4948
import Pos.Util.Trace (wlogTrace)
49+
5050
import Test.Pos.Chain.Block.Arbitrary.Generate (generateMainBlock)
5151

5252
-- HLint warning disabled since I ran into https://ghc.haskell.org/trac/ghc/ticket/13106
@@ -104,7 +104,7 @@ serverLogic streamIORef arbitraryBlock arbitraryHashes arbitraryHeaders = pureLo
104104
, getTipHeader = pure (Block.getBlockHeader arbitraryBlock)
105105
, Logic.streamBlocks = \_ -> do
106106
bs <- readIORef streamIORef
107-
each $ map serializedBlock bs
107+
yieldMany $ map serializedBlock bs
108108
}
109109

110110
serializedBlock :: Block -> SerializedBlock

pkgs/default.nix

+2-6
Original file line numberDiff line numberDiff line change
@@ -14759,7 +14759,6 @@ license = stdenv.lib.licenses.bsd3;
1475914759
, network-transport-tcp
1476014760
, optparse-applicative
1476114761
, parsec
14762-
, pipes
1476314762
, pvss
1476414763
, QuickCheck
1476514764
, random
@@ -14854,7 +14853,6 @@ network
1485414853
network-transport
1485514854
optparse-applicative
1485614855
parsec
14857-
pipes
1485814856
pvss
1485914857
QuickCheck
1486014858
random
@@ -14907,6 +14905,7 @@ cardano-sl-infra-test
1490714905
cardano-sl-networking
1490814906
cardano-sl-util
1490914907
cardano-sl-util-test
14908+
conduit
1491014909
containers
1491114910
cryptonite
1491214911
data-default
@@ -14920,7 +14919,6 @@ lens
1492014919
log-warper
1492114920
network-transport
1492214921
network-transport-inmemory
14923-
pipes
1492414922
pvss
1492514923
QuickCheck
1492614924
random
@@ -14947,13 +14945,13 @@ cardano-sl-infra
1494714945
cardano-sl-networking
1494814946
cardano-sl-util
1494914947
cardano-sl-util-test
14948+
conduit
1495014949
criterion
1495114950
deepseq
1495214951
formatting
1495314952
network-transport
1495414953
network-transport-tcp
1495514954
optparse-applicative
14956-
pipes
1495714955
QuickCheck
1495814956
time-units
1495914957
universum
@@ -16106,7 +16104,6 @@ license = stdenv.lib.licenses.mit;
1610616104
, memory
1610716105
, mmorph
1610816106
, mtl
16109-
, pipes
1611016107
, reflection
1611116108
, resourcet
1611216109
, rocksdb-haskell-ng
@@ -16157,7 +16154,6 @@ lrucache
1615716154
memory
1615816155
mmorph
1615916156
mtl
16160-
pipes
1616116157
reflection
1616216158
resourcet
1616316159
rocksdb-haskell-ng

0 commit comments

Comments
 (0)