Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 3e5d6e9

Browse files
author
Michael Hueschen
committed
[CDEC-464] Switch uses of pipes to conduit
I did not replace uses of `pipes` in the `post-mortem` executable from `cardano-sl-tools`, since it does not build at present.
1 parent eab0a6a commit 3e5d6e9

File tree

10 files changed

+36
-39
lines changed

10 files changed

+36
-39
lines changed

db/cardano-sl-db.cabal

-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ library
147147
, memory
148148
, mmorph
149149
, mtl
150-
, pipes
151150
, reflection
152151
, resourcet
153152
, rocksdb-haskell-ng

db/src/Pos/DB/Block/GState/BlockExtra.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ module Pos.DB.Block.GState.BlockExtra
2020

2121
import Universum hiding (init)
2222

23+
import Data.Conduit (ConduitT, yield)
2324
import qualified Database.RocksDB as Rocks
2425
import Formatting (bprint, build, (%))
2526
import qualified Formatting.Buildable
26-
import Pipes (Producer, yield)
2727
import Serokell.Util.Text (listJson)
2828

2929
import Pos.Binary.Class (serialize')
@@ -122,7 +122,7 @@ streamBlocks
122122
=> (HeaderHash -> m (Maybe SerializedBlock))
123123
-> (HeaderHash -> m (Maybe HeaderHash))
124124
-> HeaderHash
125-
-> Producer SerializedBlock m ()
125+
-> ConduitT () SerializedBlock m ()
126126
streamBlocks loadBlock forwardLink base = do
127127
mFirst <- lift $ forwardLink base
128128
maybe (pure ()) loop mFirst

infra/src/Pos/Infra/Diffusion/Subscription/Subscriber.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Pos.Infra.Diffusion.Subscription.Subscriber
1212
import Universum
1313

1414
-- | Generate subscription targets in some monad.
15-
-- TBD any value in using a streaming solution like pipes?
15+
-- TBD any value in using a streaming solution like conduit?
1616
newtype SubscriptionTarget m target = SubscriptionTarget
1717
{ getSubscriptionTarget :: m (Maybe (target, SubscriptionTarget m target))
1818
}

lib/bench/Bench/Pos/Diffusion/BlockDownload.hs

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@ import qualified Criterion
1919
import qualified Criterion.Main as Criterion
2020
import qualified Criterion.Main.Options as Criterion
2121
import qualified Data.ByteString.Lazy as LBS
22+
import Data.Conduit.Combinators (yieldMany)
23+
import Data.List.NonEmpty (NonEmpty ((:|)))
2224
import Data.Semigroup ((<>))
25+
import Data.Time.Units (Microsecond)
2326
import qualified Options.Applicative as Opt (execParser)
2427

25-
import Data.List.NonEmpty (NonEmpty ((:|)))
26-
import Data.Time.Units (Microsecond)
2728
import qualified Network.Broadcast.OutboundQueue as OQ
2829
import qualified Network.Broadcast.OutboundQueue.Types as OQ
2930
import Network.Transport (Transport)
3031
import qualified Network.Transport.TCP as TCP
3132
import Node (NodeId)
3233
import qualified Node
33-
import Pipes (each)
3434

3535
import Pos.Binary (serialize, serialize')
3636
import Pos.Core.Block (Block, BlockHeader, HeaderHash)
3737
import qualified Pos.Core.Block as Block (getBlockHeader)
38+
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
3839
import Pos.Core.ProtocolConstants (ProtocolConstants (..))
3940
import Pos.Core.Update (BlockVersion (..))
4041
import Pos.Crypto (ProtocolMagic (..))
@@ -52,9 +53,8 @@ import Pos.Infra.Network.Types (Bucket (..))
5253
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
5354
import Pos.Logic.Pure (pureLogic)
5455
import Pos.Logic.Types as Logic (Logic (..))
55-
56-
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
5756
import Pos.Util.Trace (noTrace, wlogTrace)
57+
5858
import Test.Pos.Block.Arbitrary.Generate (generateMainBlock)
5959

6060
-- TODO
@@ -121,7 +121,7 @@ serverLogic streamIORef arbitraryBlock arbitraryHashes arbitraryHeaders = pureLo
121121
, getTipHeader = pure (Block.getBlockHeader arbitraryBlock)
122122
, Logic.streamBlocks = \_ -> do
123123
bs <- readIORef streamIORef
124-
each $ map serializedBlock bs
124+
yieldMany $ map serializedBlock bs
125125
}
126126

127127
serializedBlock :: Block -> SerializedBlock

lib/cardano-sl.cabal

+2-3
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ library
188188
, network-transport
189189
, optparse-applicative
190190
, parsec
191-
, pipes
192191
, pvss
193192
, random
194193
, reflection
@@ -327,6 +326,7 @@ test-suite cardano-test
327326
, cardano-sl-update-test
328327
, cardano-sl-util
329328
, cardano-sl-util-test
329+
, conduit
330330
, containers
331331
, cryptonite
332332
, data-default
@@ -340,7 +340,6 @@ test-suite cardano-test
340340
, log-warper
341341
, network-transport
342342
, network-transport-inmemory
343-
, pipes
344343
, pvss
345344
, random
346345
, reflection
@@ -416,13 +415,13 @@ benchmark cardano-bench-criterion
416415
, cardano-sl-ssc
417416
, cardano-sl-util
418417
, cardano-sl-util-test
418+
, conduit
419419
, criterion
420420
, deepseq
421421
, formatting
422422
, network-transport
423423
, network-transport-tcp
424424
, optparse-applicative
425-
, pipes
426425
, time-units
427426
, universum >= 0.1.11
428427
default-language: Haskell2010

lib/src/Pos/Diffusion/Full/Block.hs

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import Control.Exception (Exception (..), throwIO)
1919
import Control.Lens (to)
2020
import Control.Monad.Except (ExceptT, runExceptT, throwError)
2121
import qualified Data.ByteString.Lazy as BSL
22+
import Data.Conduit (await, runConduit, (.|))
2223
import Data.List.NonEmpty (NonEmpty ((:|)))
2324
import qualified Data.List.NonEmpty as NE
2425
import qualified Data.Map as M
@@ -27,7 +28,6 @@ import Formatting (bprint, build, int, sformat, shown, stext, (%))
2728
import qualified Formatting.Buildable as B
2829
import qualified Network.Broadcast.OutboundQueue as OQ
2930
import Node.Conversation (sendRaw)
30-
import Pipes (await, runEffect, (>->))
3131
import Serokell.Util.Text (listJson)
3232
import qualified System.Metrics.Gauge as Gauge
3333

@@ -689,7 +689,7 @@ handleStreamStart logTrace logic oq = listenerConv logTrace oq $ \__ourVerInfo n
689689
Logic.streamBlocks logic lca
690690
lift $ send conv MsgStreamEnd
691691
consumer = loop nodeId conv window
692-
runEffect $ producer >-> consumer
692+
runConduit $ producer .| consumer
693693

694694
loop nodeId conv 0 = do
695695
lift $ traceWith logTrace (Debug, "handleStreamStart:loop waiting on window update")
@@ -703,10 +703,10 @@ handleStreamStart logTrace logic oq = listenerConv logTrace oq $ \__ourVerInfo n
703703
MsgUpdate u -> do
704704
lift $ traceWith logTrace (Debug, sformat ("handleStreamStart:loop new window "%shown%" from "%build) u nodeId)
705705
loop nodeId conv (msuWindow u)
706-
loop nodeId conv window = do
707-
b <- await
708-
lift $ sendRaw conv $ serializeMsgStreamBlock $ MsgSerializedBlock b
709-
loop nodeId conv (window - 1)
706+
loop nodeId conv window =
707+
whenJustM await $ \b -> do
708+
lift $ sendRaw conv $ serializeMsgStreamBlock $ MsgSerializedBlock b
709+
loop nodeId conv (window - 1)
710710

711711
----------------------------------------------------------------------------
712712
-- Header propagation

lib/src/Pos/Logic/Full.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ module Pos.Logic.Full
99
import Universum hiding (id)
1010

1111
import Control.Lens (at, to)
12+
import Data.Conduit (ConduitT)
1213
import qualified Data.HashMap.Strict as HM
1314
import Data.Tagged (Tagged (..), tagWith)
1415
import Formatting (build, sformat, (%))
15-
import Pipes (Producer)
1616
import System.Wlog (WithLogger, logDebug)
1717

1818
import Pos.Block.Configuration (HasBlockConfiguration)
@@ -108,7 +108,7 @@ logicFull pm ourStakeholderId securityParams jsonLogTx =
108108
getSerializedBlock :: HeaderHash -> m (Maybe SerializedBlock)
109109
getSerializedBlock = DB.dbGetSerBlock
110110

111-
streamBlocks :: HeaderHash -> Producer SerializedBlock m ()
111+
streamBlocks :: HeaderHash -> ConduitT () SerializedBlock m ()
112112
streamBlocks = Block.streamBlocks DB.dbGetSerBlock Block.resolveForwardLink
113113

114114
getTip :: m Block

lib/src/Pos/Logic/Types.hs

+8-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ module Pos.Logic.Types
1212

1313
import Universum
1414

15+
import Data.Conduit (ConduitT, transPipe)
1516
import Data.Default (def)
1617
import Data.Tagged (Tagged)
17-
import Pipes (Producer)
18-
import Pipes.Internal (unsafeHoist)
1918

2019
import Pos.Communication (NodeId)
2120
import Pos.Core (StakeholderId)
@@ -38,7 +37,7 @@ data Logic m = Logic
3837
ourStakeholderId :: StakeholderId
3938
-- | Get serialized block, perhaps from a database.
4039
, getSerializedBlock :: HeaderHash -> m (Maybe SerializedBlock)
41-
, streamBlocks :: HeaderHash -> Producer SerializedBlock m ()
40+
, streamBlocks :: HeaderHash -> ConduitT () SerializedBlock m ()
4241
-- | Get a block header.
4342
, getBlockHeader :: HeaderHash -> m (Maybe BlockHeader)
4443
-- TODO CSL-2089 use conduits in this and the following methods
@@ -102,11 +101,15 @@ data Logic m = Logic
102101
, securityParams :: SecurityParams
103102
}
104103

105-
-- | We have to hoist a pipes producer, so the Monad constraint arises.
104+
-- | The Monad constraint arises due to `transPipe` from Conduit.
105+
-- The transformation function `foo :: (forall x. m x -> n x)` must be a
106+
-- *monad morphism* and not just any natural transformation. This means,
107+
-- roughly, that `foo a >> foo b` should behave the same as `foo (a >> b)`.
108+
-- `foo = flip evalState 1`, for example, does not satisfy this requirement.
106109
hoistLogic :: Monad m => (forall x . m x -> n x) -> Logic m -> Logic n
107110
hoistLogic nat logic = logic
108111
{ getSerializedBlock = nat . getSerializedBlock logic
109-
, streamBlocks = unsafeHoist nat . streamBlocks logic
112+
, streamBlocks = transPipe nat . streamBlocks logic
110113
, getBlockHeader = nat . getBlockHeader logic
111114
, getHashesRange = \a b c -> nat (getHashesRange logic a b c)
112115
, getBlockHeaders = \a b c -> nat (getBlockHeaders logic a b c)

lib/test/Test/Pos/Diffusion/BlockSpec.hs

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,26 @@ import Universum
1010

1111
import Control.DeepSeq (force)
1212
import Control.Monad.IO.Class (liftIO)
13+
import Data.Bits
1314
import qualified Data.ByteString.Lazy as LBS
15+
import Data.Conduit.Combinators (yieldMany)
16+
import Data.List.NonEmpty (NonEmpty ((:|)))
17+
import qualified Data.List.NonEmpty as NE
1418
import Data.Semigroup ((<>))
1519
import Test.Hspec (Spec, describe, it, shouldBe)
1620

17-
import Data.Bits
18-
import Data.List.NonEmpty (NonEmpty ((:|)))
19-
import qualified Data.List.NonEmpty as NE
2021
import qualified Network.Broadcast.OutboundQueue as OQ
2122
import qualified Network.Broadcast.OutboundQueue.Types as OQ
2223
import Network.Transport (Transport, closeTransport)
2324
import qualified Network.Transport.InMemory as InMemory
2425
import Node (NodeId)
2526
import qualified Node
26-
import Pipes (each)
2727

2828
import Pos.Binary.Class (serialize')
2929
import Pos.Core.Block (Block, BlockHeader, HeaderHash,
3030
blockHeaderHash)
3131
import qualified Pos.Core.Block as Block (getBlockHeader)
32+
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
3233
import Pos.Core.ProtocolConstants (ProtocolConstants (..))
3334
import Pos.Core.Update (BlockVersion (..))
3435
import Pos.Crypto (ProtocolMagic (..))
@@ -44,9 +45,8 @@ import Pos.Infra.Network.Types (Bucket (..))
4445
import Pos.Infra.Reporting.Health.Types (HealthStatus (..))
4546
import Pos.Logic.Pure (pureLogic)
4647
import Pos.Logic.Types as Logic (Logic (..))
47-
48-
import Pos.Core.Chrono (NewestFirst (..), OldestFirst (..))
4948
import Pos.Util.Trace (wlogTrace)
49+
5050
import Test.Pos.Block.Arbitrary.Generate (generateMainBlock)
5151

5252
-- HLint warning disabled since I ran into https://ghc.haskell.org/trac/ghc/ticket/13106
@@ -104,7 +104,7 @@ serverLogic streamIORef arbitraryBlock arbitraryHashes arbitraryHeaders = pureLo
104104
, getTipHeader = pure (Block.getBlockHeader arbitraryBlock)
105105
, Logic.streamBlocks = \_ -> do
106106
bs <- readIORef streamIORef
107-
each $ map serializedBlock bs
107+
yieldMany $ map serializedBlock bs
108108
}
109109

110110
serializedBlock :: Block -> SerializedBlock

pkgs/default.nix

+2-6
Original file line numberDiff line numberDiff line change
@@ -14768,7 +14768,6 @@ license = stdenv.lib.licenses.bsd3;
1476814768
, network-transport-tcp
1476914769
, optparse-applicative
1477014770
, parsec
14771-
, pipes
1477214771
, pvss
1477314772
, QuickCheck
1477414773
, random
@@ -14868,7 +14867,6 @@ network
1486814867
network-transport
1486914868
optparse-applicative
1487014869
parsec
14871-
pipes
1487214870
pvss
1487314871
QuickCheck
1487414872
random
@@ -14930,6 +14928,7 @@ cardano-sl-update
1493014928
cardano-sl-update-test
1493114929
cardano-sl-util
1493214930
cardano-sl-util-test
14931+
conduit
1493314932
containers
1493414933
cryptonite
1493514934
data-default
@@ -14943,7 +14942,6 @@ lens
1494314942
log-warper
1494414943
network-transport
1494514944
network-transport-inmemory
14946-
pipes
1494714945
pvss
1494814946
QuickCheck
1494914947
random
@@ -14972,13 +14970,13 @@ cardano-sl-ssc
1497214970
cardano-sl-txp
1497314971
cardano-sl-util
1497414972
cardano-sl-util-test
14973+
conduit
1497514974
criterion
1497614975
deepseq
1497714976
formatting
1497814977
network-transport
1497914978
network-transport-tcp
1498014979
optparse-applicative
14981-
pipes
1498214980
QuickCheck
1498314981
time-units
1498414982
universum
@@ -16106,7 +16104,6 @@ license = stdenv.lib.licenses.mit;
1610616104
, memory
1610716105
, mmorph
1610816106
, mtl
16109-
, pipes
1611016107
, reflection
1611116108
, resourcet
1611216109
, rocksdb-haskell-ng
@@ -16162,7 +16159,6 @@ lrucache
1616216159
memory
1616316160
mmorph
1616416161
mtl
16165-
pipes
1616616162
reflection
1616716163
resourcet
1616816164
rocksdb-haskell-ng

0 commit comments

Comments
 (0)