Skip to content

Commit 3ce536b

Browse files
jutarodeepfire
authored andcommitted
trace-forward: Fix write to sink
1 parent a22e464 commit 3ce536b

File tree

1 file changed

+40
-22
lines changed

1 file changed

+40
-22
lines changed

trace-forward/src/Trace/Forward/Utils/TraceObject.hs

+40-22
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ module Trace.Forward.Utils.TraceObject
1414
import Control.Concurrent.STM (STM, atomically, retry)
1515
import Control.Concurrent.STM.TBQueue
1616
import Control.Concurrent.STM.TVar
17-
import Control.Monad (unless)
17+
import Control.Monad (unless, (<$!>))
1818
import Control.Monad.Extra (whenM)
1919
import qualified Data.List.NonEmpty as NE
2020
import Data.Word (Word16)
2121
import System.IO
2222

2323
import Trace.Forward.Configuration.TraceObject
24-
import Trace.Forward.Protocol.TraceObject.Type
2524
import qualified Trace.Forward.Protocol.TraceObject.Forwarder as Forwarder
25+
import Trace.Forward.Protocol.TraceObject.Type
26+
2627

2728
data ForwardSink lo = ForwardSink
2829
{ forwardQueue :: !(TVar (TBQueue lo))
@@ -58,40 +59,57 @@ writeToSink
5859
-> lo
5960
-> IO ()
6061
writeToSink ForwardSink{forwardQueue, disconnectedSize, connectedSize, wasUsed} traceObject = do
61-
q <- readTVarIO forwardQueue
62-
atomically ((,) <$> isFullTBQueue q
63-
<*> isEmptyTBQueue q) >>= \case
64-
(True, _) -> maybeFlushQueueToStdout q
65-
(_, True) -> checkIfSinkWasUsed q
66-
(_, _) -> return ()
67-
atomically $ readTVar forwardQueue >>= flip writeTBQueue traceObject
62+
condToFlush <- atomically $ do
63+
q <- readTVar forwardQueue
64+
((,) <$> isFullTBQueue q
65+
<*> isEmptyTBQueue q) >>= \case
66+
(True, _) -> do
67+
res <- maybeFlushQueueToStdout q
68+
q' <- readTVar forwardQueue
69+
writeTBQueue q' traceObject
70+
pure res
71+
(_, True) -> do
72+
maybeShrinkQueue q
73+
q' <- readTVar forwardQueue
74+
writeTBQueue q' traceObject
75+
pure Nothing
76+
(_, _) -> do
77+
writeTBQueue q traceObject
78+
pure Nothing
79+
case condToFlush of
80+
Nothing -> pure ()
81+
Just li -> do
82+
mapM_ print li
83+
hFlush stdout
6884
where
6985
-- The queue is full, but if it's a small queue, we can switch it
7086
-- to a big one and give a chance not to flush items to stdout yet.
7187
maybeFlushQueueToStdout q = do
72-
qLen <- atomically $ lengthTBQueue q
88+
qLen <- lengthTBQueue q
7389
if fromIntegral qLen == connectedSize
74-
then atomically $ do
90+
then do
7591
-- The small queue is full, so we have to switch to a big one and
7692
-- then flush collected items from the small queue and store them in
7793
-- a big one.
78-
acceptedItems <- flushTBQueue q
94+
95+
acceptedItems <- -- trace ("growQueue disconnected" ++ show disconnectedSize) $
96+
flushTBQueue q
7997
switchQueue disconnectedSize
8098
bigQ <- readTVar forwardQueue
8199
mapM_ (writeTBQueue bigQ) acceptedItems
100+
pure Nothing
82101
else do
83102
-- The big queue is full, we have to flush it to stdout.
84-
atomically (flushTBQueue q) >>= mapM_ print
85-
hFlush stdout
86-
87-
checkIfSinkWasUsed q = atomically $
88-
whenM (readTVar wasUsed) $ switchToAnotherQueue q
103+
Just <$!> flushTBQueue q
89104

90-
switchToAnotherQueue q = do
91-
qLen <- lengthTBQueue q
92-
if fromIntegral qLen == disconnectedSize
93-
then switchQueue connectedSize
94-
else switchQueue disconnectedSize
105+
-- if the sink was used and it
106+
maybeShrinkQueue q = do
107+
whenM (readTVar wasUsed) $ do
108+
qLen <- lengthTBQueue q
109+
if fromIntegral qLen == disconnectedSize
110+
then -- trace ("shrinkQueue connected " ++ show connectedSize) $
111+
switchQueue connectedSize
112+
else pure ()
95113

96114
switchQueue size =
97115
newTBQueue (fromIntegral size) >>= modifyTVar' forwardQueue . const

0 commit comments

Comments
 (0)