Skip to content

tx-generator: fix a bug #4239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,13 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
let newStats = stats { stsAcked = stsAcked stats + Ack ack }
traceWith bmtr $ TraceBenchTxSubServAck (getTxId . getTxBody <$> acked)
traceWith bmtr $ SubmissionClientDiscardAcknowledged (getTxId . getTxBody <$> acked)
return (txSource, UnAcked stillUnacked, newStats)

queueNewTxs :: [tx] -> LocalState era -> LocalState era
queueNewTxs newTxs (txSource, UnAcked unAcked, stats)
= (txSource, UnAcked (newTxs <> unAcked), stats)

-- Sadly, we can't just return what we want, we instead have to
-- communicate via IORefs, because..
-- The () return type is forced by Ouroboros.Network.NodeToNode.connectTo
client ::LocalState era -> ClientStIdle (GenTxId CardanoBlock) (GenTx CardanoBlock) m ()

client localState = ClientStIdle
Expand All @@ -140,13 +137,14 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
req = Req $ fromIntegral reqNum
traceWith tr $ reqIdsTrace ack req blocking
stateA <- discardAcknowledged blocking ack state

traceWith bmtr $ TraceBenchTxSubDebug "return from discard"
(stateB, newTxs) <- produceNextTxs blocking req stateA
traceWith bmtr $ TraceBenchTxSubDebug "return from produceNext"
let stateC@(_, UnAcked outs , stats) = queueNewTxs newTxs stateB

traceWith tr $ idListTrace (ToAnnce newTxs) blocking
traceWith bmtr $ TraceBenchTxSubServAnn (getTxId . getTxBody <$> newTxs)
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> outs)
traceWith bmtr $ SubmissionClientReplyTxIds (getTxId . getTxBody <$> newTxs)
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
Expand Down Expand Up @@ -175,8 +173,8 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
missIds = reqTxIds L.\\ uaIds

traceWith tr $ TxList (length toSend)
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> ua)
traceWith bmtr $ TraceBenchTxSubServReq reqTxIds
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> ua)
unless (L.null missIds) $
traceWith bmtr $ TraceBenchTxSubServUnav missIds
pure $ SendMsgReplyTxs (toGenTx <$> toSend)
Expand Down Expand Up @@ -213,10 +211,9 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsPrompt ack req
TokNonBlocking -> ReqIdsNonBlocking ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListPrompt $ length toAnn

TokNonBlocking -> IdsListNonBlocking $ length toAnn
10 changes: 5 additions & 5 deletions bench/tx-generator/src/Cardano/Benchmarking/LogTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ data TraceBenchTxSubmit txid
| TraceBenchTxSubStart [txid]
-- ^ The @txid@ has been submitted to `TxSubmission`
-- protocol peer.
| TraceBenchTxSubServAnn [txid]
| SubmissionClientReplyTxIds [txid]
-- ^ Announcing txids in response for server's request.
| TraceBenchTxSubServReq [txid]
-- ^ Request for @tx@ received from `TxSubmission` protocol
-- peer.
| TraceBenchTxSubServAck [txid]
| SubmissionClientDiscardAcknowledged [txid]
-- ^ An ack (window moved over) received for these transactions.
| TraceBenchTxSubServDrop [txid]
-- ^ Transactions the server implicitly dropped.
| TraceBenchTxSubServOuts [txid]
| SubmissionClientUnAcked [txid]
-- ^ Transactions outstanding.
| TraceBenchTxSubServUnav [txid]
-- ^ Transactions requested, but unavailable in the outstanding set.
Expand Down Expand Up @@ -113,8 +113,8 @@ instance ToJSON SubmissionSummary
data NodeToNodeSubmissionTrace
= ReqIdsBlocking Ack Req
| IdsListBlocking Int
| ReqIdsPrompt Ack Req
| IdsListPrompt Int
| ReqIdsNonBlocking Ack Req
| IdsListNonBlocking Int
| ReqTxs Int
| TxList Int
| EndOfProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ runBenchmarkInEra sourceWallet submitMode (ThreadName threadName) shape tps era

let
inToOut :: [Lovelace] -> [Lovelace]
inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputs shape)
inToOut = FundSet.inputsToOutputsWithFee (auxFee shape) (auxOutputsPerTx shape)

txGenerator = genTx protocolParameters (TxInsCollateralNone, []) (mkFee (auxFee shape)) metadata (KeyWitness KeyWitnessForSpending)

Expand Down Expand Up @@ -642,4 +642,3 @@ and for which the JSON encoding is "reserved".
reserved :: [String] -> ActionM ()
reserved _ = do
throwE $ UserError "no dirty hack is implemented"

37 changes: 23 additions & 14 deletions bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ data TpsThrottle = TpsThrottle {
startSending :: IO ()
, sendStop :: STM ()
, receiveBlocking :: STM Step
, receiveNoneBlocking :: STM (Maybe Step)
, receiveNonBlocking :: STM (Maybe Step)
}

-- TVar state ::
-- empty -> Block submission
-- Just n -> allow n transmissions
-- Just 0 -> illegal state
-- Just n -> allow n transmissions ( n must be >0 )
-- Nothing -> teminate transmission

newTpsThrottle :: Int -> Int -> TPSRate -> IO TpsThrottle
Expand All @@ -34,19 +35,16 @@ newTpsThrottle buffersize count tpsRate = do
startSending = sendNTicks tpsRate buffersize count var
, sendStop = putTMVar var Nothing
, receiveBlocking = takeTMVar var >>= receiveAction var
, receiveNoneBlocking = do
s <- tryTakeTMVar var
case s of
Nothing -> return Nothing
Just state -> Just <$> receiveAction var state
, receiveNonBlocking =
(Just <$> (takeTMVar var >>= receiveAction var )) `orElse` return Nothing
}

receiveAction :: TMVar (Maybe Int) -> Maybe Int -> STM Step
receiveAction var state = case state of
Nothing -> do
putTMVar var Nothing
return Stop
Just 1 -> return Next -- leave var empty
Just 1 -> return Next -- leave var empty, i.e. block submission until sendNTicks unblocks
Just n -> do
-- decrease counter and let other threads transmit
putTMVar var $ Just $ pred n
Expand All @@ -59,15 +57,15 @@ sendNTicks (TPSRate rate) buffersize count var = do
where
worker 0 _ _ = return ()
worker n lastPreDelay lastDelay = do
atomically increaseWatermark
increaseWatermark
now <- Clock.getCurrentTime
let targetDelay = realToFrac $ 1.0 / rate
loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay
delay = targetDelay - loopCost
threadDelay . ceiling $ (realToFrac delay * 1000000.0 :: Double)
worker (pred n) now delay
-- increaseWatermark can retry/block if there are already buffersize ticks in the "queue"
increaseWatermark = do
increaseWatermark = atomically $ do
s <- tryTakeTMVar var
case s of
Nothing -> putTMVar var $ Just 1
Expand All @@ -90,19 +88,18 @@ consumeTxsNonBlocking tpsThrottle req
= if req==0
then pure (Next, 0)
else do
STM.atomically (receiveNoneBlocking tpsThrottle) >>= \case
STM.atomically (receiveNonBlocking tpsThrottle) >>= \case
Nothing -> pure (Next, 0)
Just Stop -> pure (Stop, 0)
Just Next -> pure (Next, 1)



test :: IO ()
test = do
t <- newTpsThrottle 10 50 2
_threadId <- startThrottle t
threadDelay 5000000
forM_ [1 .. 5] $ \i -> forkIO $ consumer t i
forM_ [6 .. 7] $ \i -> forkIO $ consumer2 t i
putStrLn "done"
where
startThrottle t = forkIO $ do
Expand All @@ -114,4 +111,16 @@ test = do
consumer t n = do
s <- atomically $ receiveBlocking t
print (n, s)
if s==Next then consumer t n else putStrLn $ "Done " ++ show n
if s == Next then consumer t n else putStrLn $ "Done " ++ show n

consumer2 :: TpsThrottle -> Int -> IO ()
consumer2 t n = do
r <- atomically $ receiveNonBlocking t
case r of
Just s -> do
print (n, s)
if s == Next then consumer2 t n else putStrLn $ "Done " ++ show n
Nothing -> do
putStrLn $ "wait " ++ show n
threadDelay 100000
consumer2 t n
20 changes: 10 additions & 10 deletions bench/tx-generator/src/Cardano/Benchmarking/Tracer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,24 @@ instance LogFormatting (TraceBenchTxSubmit TxId) where
mconcat [ "kind" .= A.String "TraceBenchTxSubStart"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServAnn txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServAnn"
SubmissionClientReplyTxIds txIds ->
mconcat [ "kind" .= A.String "SubmissionClientReplyTxIds"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServReq txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServReq"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServAck txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServAck"
SubmissionClientDiscardAcknowledged txIds ->
mconcat [ "kind" .= A.String "SubmissionClientDiscardAcknowledged"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServDrop txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServDrop"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServOuts txIds ->
mconcat [ "kind" .= A.String "TraceBenchTxSubServOuts"
SubmissionClientUnAcked txIds ->
mconcat [ "kind" .= A.String "SubmissionClientUnAcked"
, "txIds" .= toJSON txIds
]
TraceBenchTxSubServUnav txIds ->
Expand Down Expand Up @@ -196,12 +196,12 @@ instance LogFormatting NodeToNodeSubmissionTrace where
IdsListBlocking sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListBlocking"
, "sent" .= A.toJSON sent ]
ReqIdsPrompt (Ack ack) (Req req) -> KeyMap.fromList
[ "kind" .= A.String "ReqIdsPrompt"
ReqIdsNonBlocking (Ack ack) (Req req) -> KeyMap.fromList
[ "kind" .= A.String "ReqIdsNonBlocking"
, "ack" .= A.toJSON ack
, "req" .= A.toJSON req ]
IdsListPrompt sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListPrompt"
IdsListNonBlocking sent -> KeyMap.fromList
[ "kind" .= A.String "IdsListNonBlocking"
, "sent" .= A.toJSON sent ]
EndOfProtocol -> KeyMap.fromList [ "kind" .= A.String "EndOfProtocol" ]
ReqTxs req -> KeyMap.fromList
Expand Down