@@ -19,12 +19,13 @@ data TpsThrottle = TpsThrottle {
19
19
startSending :: IO ()
20
20
, sendStop :: STM ()
21
21
, receiveBlocking :: STM Step
22
- , receiveNoneBlocking :: STM (Maybe Step )
22
+ , receiveNonBlocking :: STM (Maybe Step )
23
23
}
24
24
25
25
-- TVar state ::
26
26
-- empty -> Block submission
27
- -- Just n -> allow n transmissions
27
+ -- Just 0 -> illegal state
28
+ -- Just n -> allow n transmissions ( n must be >0 )
28
29
-- Nothing -> teminate transmission
29
30
30
31
newTpsThrottle :: Int -> Int -> TPSRate -> IO TpsThrottle
@@ -34,19 +35,16 @@ newTpsThrottle buffersize count tpsRate = do
34
35
startSending = sendNTicks tpsRate buffersize count var
35
36
, sendStop = putTMVar var Nothing
36
37
, receiveBlocking = takeTMVar var >>= receiveAction var
37
- , receiveNoneBlocking = do
38
- s <- tryTakeTMVar var
39
- case s of
40
- Nothing -> return Nothing
41
- Just state -> Just <$> receiveAction var state
38
+ , receiveNonBlocking =
39
+ (Just <$> (takeTMVar var >>= receiveAction var )) `orElse` return Nothing
42
40
}
43
41
44
42
receiveAction :: TMVar (Maybe Int ) -> Maybe Int -> STM Step
45
43
receiveAction var state = case state of
46
44
Nothing -> do
47
45
putTMVar var Nothing
48
46
return Stop
49
- Just 1 -> return Next -- leave var empty
47
+ Just 1 -> return Next -- leave var empty, i.e. block submission until sendNTicks unblocks
50
48
Just n -> do
51
49
-- decrease counter and let other threads transmit
52
50
putTMVar var $ Just $ pred n
@@ -59,15 +57,15 @@ sendNTicks (TPSRate rate) buffersize count var = do
59
57
where
60
58
worker 0 _ _ = return ()
61
59
worker n lastPreDelay lastDelay = do
62
- atomically increaseWatermark
60
+ increaseWatermark
63
61
now <- Clock. getCurrentTime
64
62
let targetDelay = realToFrac $ 1.0 / rate
65
63
loopCost = (now `Clock.diffUTCTime` lastPreDelay) - lastDelay
66
64
delay = targetDelay - loopCost
67
65
threadDelay . ceiling $ (realToFrac delay * 1000000.0 :: Double )
68
66
worker (pred n) now delay
69
67
-- increaseWatermark can retry/block if there are already buffersize ticks in the "queue"
70
- increaseWatermark = do
68
+ increaseWatermark = atomically $ do
71
69
s <- tryTakeTMVar var
72
70
case s of
73
71
Nothing -> putTMVar var $ Just 1
@@ -90,19 +88,18 @@ consumeTxsNonBlocking tpsThrottle req
90
88
= if req== 0
91
89
then pure (Next , 0 )
92
90
else do
93
- STM. atomically (receiveNoneBlocking tpsThrottle) >>= \ case
91
+ STM. atomically (receiveNonBlocking tpsThrottle) >>= \ case
94
92
Nothing -> pure (Next , 0 )
95
93
Just Stop -> pure (Stop , 0 )
96
94
Just Next -> pure (Next , 1 )
97
95
98
-
99
-
100
96
test :: IO ()
101
97
test = do
102
98
t <- newTpsThrottle 10 50 2
103
99
_threadId <- startThrottle t
104
100
threadDelay 5000000
105
101
forM_ [1 .. 5 ] $ \ i -> forkIO $ consumer t i
102
+ forM_ [6 .. 7 ] $ \ i -> forkIO $ consumer2 t i
106
103
putStrLn " done"
107
104
where
108
105
startThrottle t = forkIO $ do
@@ -114,4 +111,16 @@ test = do
114
111
consumer t n = do
115
112
s <- atomically $ receiveBlocking t
116
113
print (n, s)
117
- if s== Next then consumer t n else putStrLn $ " Done " ++ show n
114
+ if s == Next then consumer t n else putStrLn $ " Done " ++ show n
115
+
116
+ consumer2 :: TpsThrottle -> Int -> IO ()
117
+ consumer2 t n = do
118
+ r <- atomically $ receiveNonBlocking t
119
+ case r of
120
+ Just s -> do
121
+ print (n, s)
122
+ if s == Next then consumer2 t n else putStrLn $ " Done " ++ show n
123
+ Nothing -> do
124
+ putStrLn $ " wait " ++ show n
125
+ threadDelay 100000
126
+ consumer2 t n
0 commit comments