diff --git a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs index c163509f5b0..b26b09b90f9 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs @@ -112,16 +112,13 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = fail (T.unpack err) let (stillUnacked, acked) = L.splitAtEnd ack unAcked let newStats = stats { stsAcked = stsAcked stats + Ack ack } - traceWith bmtr $ TraceBenchTxSubServAck (getTxId . getTxBody <$> acked) + traceWith bmtr $ SubmissionClientDiscardAcknowledged (getTxId . getTxBody <$> acked) return (txSource, UnAcked stillUnacked, newStats) queueNewTxs :: [tx] -> LocalState era -> LocalState era queueNewTxs newTxs (txSource, UnAcked unAcked, stats) = (txSource, UnAcked (newTxs <> unAcked), stats) - -- Sadly, we can't just return what we want, we instead have to - -- communicate via IORefs, because.. - -- The () return type is forced by Ouroboros.Network.NodeToNode.connectTo client ::LocalState era -> ClientStIdle (GenTxId CardanoBlock) (GenTx CardanoBlock) m () client localState = ClientStIdle @@ -140,13 +137,14 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = req = Req $ fromIntegral reqNum traceWith tr $ reqIdsTrace ack req blocking stateA <- discardAcknowledged blocking ack state - + traceWith bmtr $ TraceBenchTxSubDebug "return from discard" (stateB, newTxs) <- produceNextTxs blocking req stateA + traceWith bmtr $ TraceBenchTxSubDebug "return from produceNext" let stateC@(_, UnAcked outs , stats) = queueNewTxs newTxs stateB traceWith tr $ idListTrace (ToAnnce newTxs) blocking - traceWith bmtr $ TraceBenchTxSubServAnn (getTxId . getTxBody <$> newTxs) - traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> outs) + traceWith bmtr $ SubmissionClientReplyTxIds (getTxId . getTxBody <$> newTxs) + traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs) case blocking of TokBlocking -> case NE.nonEmpty newTxs of @@ -175,8 +173,8 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = missIds = reqTxIds L.\\ uaIds traceWith tr $ TxList (length toSend) + traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> ua) traceWith bmtr $ TraceBenchTxSubServReq reqTxIds - traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> ua) unless (L.null missIds) $ traceWith bmtr $ TraceBenchTxSubServUnav missIds pure $ SendMsgReplyTxs (toGenTx <$> toSend) @@ -213,10 +211,9 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace reqIdsTrace ack req = \case TokBlocking -> ReqIdsBlocking ack req - TokNonBlocking -> ReqIdsPrompt ack req + TokNonBlocking -> ReqIdsNonBlocking ack req idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace idListTrace (ToAnnce toAnn) = \case TokBlocking -> IdsListBlocking $ length toAnn - TokNonBlocking -> IdsListPrompt $ length toAnn - + TokNonBlocking -> IdsListNonBlocking $ length toAnn diff --git a/bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs b/bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs index dc77e777beb..3ad1d5be65f 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs @@ -68,16 +68,16 @@ data TraceBenchTxSubmit txid | TraceBenchTxSubStart [txid] -- ^ The @txid@ has been submitted to `TxSubmission` -- protocol peer. - | TraceBenchTxSubServAnn [txid] + | SubmissionClientReplyTxIds [txid] -- ^ Announcing txids in response for server's request. | TraceBenchTxSubServReq [txid] -- ^ Request for @tx@ received from `TxSubmission` protocol -- peer. - | TraceBenchTxSubServAck [txid] + | SubmissionClientDiscardAcknowledged [txid] -- ^ An ack (window moved over) received for these transactions. | TraceBenchTxSubServDrop [txid] -- ^ Transactions the server implicitly dropped. - | TraceBenchTxSubServOuts [txid] + | SubmissionClientUnAcked [txid] -- ^ Transactions outstanding. | TraceBenchTxSubServUnav [txid] -- ^ Transactions requested, but unavailable in the outstanding set. @@ -113,8 +113,8 @@ instance ToJSON SubmissionSummary data NodeToNodeSubmissionTrace = ReqIdsBlocking Ack Req | IdsListBlocking Int - | ReqIdsPrompt Ack Req - | IdsListPrompt Int + | ReqIdsNonBlocking Ack Req + | IdsListNonBlocking Int | ReqTxs Int | TxList Int | EndOfProtocol diff --git a/bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs b/bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs index c1a594a74b1..4b50548ee5e 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/Script/Core.hs @@ -276,7 +276,7 @@ runBenchmarkInEra sourceWallet submitMode (ThreadName threadName) shape tps era let inToOut :: [Lovelace] -> [Lovelace] - inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputs shape) + inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputsPerTx shape) txGenerator = genTx protocolParameters (TxInsCollateralNone, []) (mkFee (auxFee shape)) metadata (KeyWitness KeyWitnessForSpending) @@ -642,4 +642,3 @@ and for which the JSON encoding is "reserved". reserved :: [String] -> ActionM () reserved _ = do throwE $ UserError "no dirty hack is implemented" - diff --git a/bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs b/bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs index 8f7a0fc0a84..b3c4cc3af6c 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs @@ -19,12 +19,13 @@ data TpsThrottle = TpsThrottle { startSending :: IO () , sendStop :: STM () , receiveBlocking :: STM Step - , receiveNoneBlocking :: STM (Maybe Step) + , receiveNonBlocking :: STM (Maybe Step) } -- TVar state :: -- empty -> Block submission --- Just n -> allow n transmissions +-- Just 0 -> illegal state +-- Just n -> allow n transmissions ( n must be >0 ) -- Nothing -> teminate transmission newTpsThrottle :: Int -> Int -> TPSRate -> IO TpsThrottle @@ -34,11 +35,8 @@ newTpsThrottle buffersize count tpsRate = do startSending = sendNTicks tpsRate buffersize count var , sendStop = putTMVar var Nothing , receiveBlocking = takeTMVar var >>= receiveAction var - , receiveNoneBlocking = do - s <- tryTakeTMVar var - case s of - Nothing -> return Nothing - Just state -> Just <$> receiveAction var state + , receiveNonBlocking = + (Just <$> (takeTMVar var >>= receiveAction var )) `orElse` return Nothing } receiveAction :: TMVar (Maybe Int) -> Maybe Int -> STM Step @@ -46,7 +44,7 @@ receiveAction var state = case state of Nothing -> do putTMVar var Nothing return Stop - Just 1 -> return Next -- leave var empty + Just 1 -> return Next -- leave var empty, i.e. block submission until sendNTicks unblocks Just n -> do -- decrease counter and let other threads transmit putTMVar var $ Just $ pred n @@ -59,7 +57,7 @@ sendNTicks (TPSRate rate) buffersize count var = do where worker 0 _ _ = return () worker n lastPreDelay lastDelay = do - atomically increaseWatermark + increaseWatermark now <- Clock.getCurrentTime let targetDelay = realToFrac $ 1.0 / rate loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay @@ -67,7 +65,7 @@ sendNTicks (TPSRate rate) buffersize count var = do threadDelay . ceiling $ (realToFrac delay * 1000000.0 :: Double) worker (pred n) now delay -- increaseWatermark can retry/block if there are already buffersize ticks in the "queue" - increaseWatermark = do + increaseWatermark = atomically $ do s <- tryTakeTMVar var case s of Nothing -> putTMVar var $ Just 1 @@ -90,19 +88,18 @@ consumeTxsNonBlocking tpsThrottle req = if req==0 then pure (Next, 0) else do - STM.atomically (receiveNoneBlocking tpsThrottle) >>= \case + STM.atomically (receiveNonBlocking tpsThrottle) >>= \case Nothing -> pure (Next, 0) Just Stop -> pure (Stop, 0) Just Next -> pure (Next, 1) - - test :: IO () test = do t <- newTpsThrottle 10 50 2 _threadId <- startThrottle t threadDelay 5000000 forM_ [1 .. 5] $ \i -> forkIO $ consumer t i + forM_ [6 .. 7] $ \i -> forkIO $ consumer2 t i putStrLn "done" where startThrottle t = forkIO $ do @@ -114,4 +111,16 @@ test = do consumer t n = do s <- atomically $ receiveBlocking t print (n, s) - if s==Next then consumer t n else putStrLn $ "Done " ++ show n + if s == Next then consumer t n else putStrLn $ "Done " ++ show n + + consumer2 :: TpsThrottle -> Int -> IO () + consumer2 t n = do + r <- atomically $ receiveNonBlocking t + case r of + Just s -> do + print (n, s) + if s == Next then consumer2 t n else putStrLn $ "Done " ++ show n + Nothing -> do + putStrLn $ "wait " ++ show n + threadDelay 100000 + consumer2 t n diff --git a/bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs b/bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs index 4fcd3fd6146..77cb333e5f1 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs @@ -129,24 +129,24 @@ instance LogFormatting (TraceBenchTxSubmit TxId) where mconcat [ "kind" .= A.String "TraceBenchTxSubStart" , "txIds" .= toJSON txIds ] - TraceBenchTxSubServAnn txIds -> - mconcat [ "kind" .= A.String "TraceBenchTxSubServAnn" + SubmissionClientReplyTxIds txIds -> + mconcat [ "kind" .= A.String "SubmissionClientReplyTxIds" , "txIds" .= toJSON txIds ] TraceBenchTxSubServReq txIds -> mconcat [ "kind" .= A.String "TraceBenchTxSubServReq" , "txIds" .= toJSON txIds ] - TraceBenchTxSubServAck txIds -> - mconcat [ "kind" .= A.String "TraceBenchTxSubServAck" + SubmissionClientDiscardAcknowledged txIds -> + mconcat [ "kind" .= A.String "SubmissionClientDiscardAcknowledged" , "txIds" .= toJSON txIds ] TraceBenchTxSubServDrop txIds -> mconcat [ "kind" .= A.String "TraceBenchTxSubServDrop" , "txIds" .= toJSON txIds ] - TraceBenchTxSubServOuts txIds -> - mconcat [ "kind" .= A.String "TraceBenchTxSubServOuts" + SubmissionClientUnAcked txIds -> + mconcat [ "kind" .= A.String "SubmissionClientUnAcked" , "txIds" .= toJSON txIds ] TraceBenchTxSubServUnav txIds -> @@ -196,12 +196,12 @@ instance LogFormatting NodeToNodeSubmissionTrace where IdsListBlocking sent -> KeyMap.fromList [ "kind" .= A.String "IdsListBlocking" , "sent" .= A.toJSON sent ] - ReqIdsPrompt (Ack ack) (Req req) -> KeyMap.fromList - [ "kind" .= A.String "ReqIdsPrompt" + ReqIdsNonBlocking (Ack ack) (Req req) -> KeyMap.fromList + [ "kind" .= A.String "ReqIdsNonBlocking" , "ack" .= A.toJSON ack , "req" .= A.toJSON req ] - IdsListPrompt sent -> KeyMap.fromList - [ "kind" .= A.String "IdsListPrompt" + IdsListNonBlocking sent -> KeyMap.fromList + [ "kind" .= A.String "IdsListNonBlocking" , "sent" .= A.toJSON sent ] EndOfProtocol -> KeyMap.fromList [ "kind" .= A.String "EndOfProtocol" ] ReqTxs req -> KeyMap.fromList