26
26
import com .datastax .oss .driver .shaded .guava .common .annotations .VisibleForTesting ;
27
27
import edu .umd .cs .findbugs .annotations .NonNull ;
28
28
import edu .umd .cs .findbugs .annotations .Nullable ;
29
- import java .util .ArrayDeque ;
30
29
import java .util .Deque ;
31
- import java .util .concurrent .locks . ReentrantLock ;
32
- import net . jcip . annotations . GuardedBy ;
30
+ import java .util .concurrent .ConcurrentLinkedDeque ;
31
+ import java . util . concurrent . atomic . AtomicInteger ;
33
32
import net .jcip .annotations .ThreadSafe ;
34
33
import org .slf4j .Logger ;
35
34
import org .slf4j .LoggerFactory ;
@@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler {
61
60
private final String logPrefix ;
62
61
private final int maxConcurrentRequests ;
63
62
private final int maxQueueSize ;
64
-
65
- private final ReentrantLock lock = new ReentrantLock ();
66
-
67
- @ GuardedBy ("lock" )
68
- private int concurrentRequests ;
69
-
70
- @ GuardedBy ("lock" )
71
- private final Deque <Throttled > queue = new ArrayDeque <>();
72
-
73
- @ GuardedBy ("lock" )
74
- private boolean closed ;
63
+ private final AtomicInteger concurrentRequests = new AtomicInteger (0 );
64
+ // CLQ is not O(1) for size(), as it forces a full iteration of the queue. So, we track
65
+ // the size of the queue explicitly.
66
+ private final Deque <Throttled > queue = new ConcurrentLinkedDeque <>();
67
+ private final AtomicInteger queueSize = new AtomicInteger (0 );
68
+ private volatile boolean closed = false ;
75
69
76
70
public ConcurrencyLimitingRequestThrottler (DriverContext context ) {
77
71
this .logPrefix = context .getSessionName ();
@@ -88,50 +82,62 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {
88
82
89
83
@ Override
90
84
public void register (@ NonNull Throttled request ) {
91
- boolean notifyReadyRequired = false ;
85
+ if (closed ) {
86
+ LOG .trace ("[{}] Rejecting request after shutdown" , logPrefix );
87
+ fail (request , "The session is shutting down" );
88
+ return ;
89
+ }
92
90
93
- lock .lock ();
94
- try {
95
- if (closed ) {
96
- LOG .trace ("[{}] Rejecting request after shutdown" , logPrefix );
97
- fail (request , "The session is shutting down" );
98
- } else if (queue .isEmpty () && concurrentRequests < maxConcurrentRequests ) {
99
- // We have capacity for one more concurrent request
91
+ // Implementation note: Technically the "concurrent requests" or "queue size"
92
+ // could read transiently over the limit, but the queue itself will never grow
93
+ // beyond the limit since we always check for that condition and revert if
94
+ // over-limit. We do this instead of a CAS-loop to avoid the potential loop.
95
+
96
+ // If no backlog exists AND we get capacity, we can execute immediately
97
+ if (queueSize .get () == 0 ) {
98
+ // Take a claim first, and then check if we are OK to proceed
99
+ int newConcurrent = concurrentRequests .incrementAndGet ();
100
+ if (newConcurrent <= maxConcurrentRequests ) {
100
101
LOG .trace ("[{}] Starting newly registered request" , logPrefix );
101
- concurrentRequests += 1 ;
102
- notifyReadyRequired = true ;
103
- } else if (queue .size () < maxQueueSize ) {
104
- LOG .trace ("[{}] Enqueuing request" , logPrefix );
105
- queue .add (request );
102
+ request .onThrottleReady (false );
103
+ return ;
106
104
} else {
107
- LOG .trace ("[{}] Rejecting request because of full queue" , logPrefix );
108
- fail (
109
- request ,
110
- String .format (
111
- "The session has reached its maximum capacity "
112
- + "(concurrent requests: %d, queue size: %d)" ,
113
- maxConcurrentRequests , maxQueueSize ));
105
+ // We exceeded the limit, decrement the count and fall through to the queuing logic
106
+ concurrentRequests .decrementAndGet ();
114
107
}
115
- } finally {
116
- lock .unlock ();
117
108
}
118
109
119
- // no need to hold the lock while allowing the task to progress
120
- if (notifyReadyRequired ) {
121
- request .onThrottleReady (false );
110
+ // If we have a backlog, or we failed to claim capacity, try to enqueue
111
+ int newQueueSize = queueSize .incrementAndGet ();
112
+ if (newQueueSize <= maxQueueSize ) {
113
+ LOG .trace ("[{}] Enqueuing request" , logPrefix );
114
+ queue .offer (request );
115
+
116
+ // Double-check that we were still supposed to be enqueued; it is possible
117
+ // that the session was closed while we were enqueuing, it's also possible
118
+ // that it is right now removing the request, so we need to check both
119
+ if (closed ) {
120
+ if (queue .remove (request )) {
121
+ queueSize .decrementAndGet ();
122
+ LOG .trace ("[{}] Rejecting late request after shutdown" , logPrefix );
123
+ fail (request , "The session is shutting down" );
124
+ }
125
+ }
126
+ } else {
127
+ LOG .trace ("[{}] Rejecting request because of full queue" , logPrefix );
128
+ queueSize .decrementAndGet ();
129
+ fail (
130
+ request ,
131
+ String .format (
132
+ "The session has reached its maximum capacity "
133
+ + "(concurrent requests: %d, queue size: %d)" ,
134
+ maxConcurrentRequests , maxQueueSize ));
122
135
}
123
136
}
124
137
125
138
@ Override
126
139
public void signalSuccess (@ NonNull Throttled request ) {
127
- Throttled nextRequest = null ;
128
- lock .lock ();
129
- try {
130
- nextRequest = onRequestDoneAndDequeNext ();
131
- } finally {
132
- lock .unlock ();
133
- }
134
-
140
+ Throttled nextRequest = onRequestDoneAndDequeNext ();
135
141
if (nextRequest != null ) {
136
142
nextRequest .onThrottleReady (true );
137
143
}
@@ -145,17 +151,13 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
145
151
@ Override
146
152
public void signalTimeout (@ NonNull Throttled request ) {
147
153
Throttled nextRequest = null ;
148
- lock .lock ();
149
- try {
150
- if (!closed ) {
151
- if (queue .remove (request )) { // The request timed out before it was active
152
- LOG .trace ("[{}] Removing timed out request from the queue" , logPrefix );
153
- } else {
154
- nextRequest = onRequestDoneAndDequeNext ();
155
- }
154
+ if (!closed ) {
155
+ if (queue .remove (request )) { // The request timed out before it was active
156
+ queueSize .decrementAndGet ();
157
+ LOG .trace ("[{}] Removing timed out request from the queue" , logPrefix );
158
+ } else {
159
+ nextRequest = onRequestDoneAndDequeNext ();
156
160
}
157
- } finally {
158
- lock .unlock ();
159
161
}
160
162
161
163
if (nextRequest != null ) {
@@ -166,35 +168,30 @@ public void signalTimeout(@NonNull Throttled request) {
166
168
@ Override
167
169
public void signalCancel (@ NonNull Throttled request ) {
168
170
Throttled nextRequest = null ;
169
- lock .lock ();
170
- try {
171
- if (!closed ) {
172
- if (queue .remove (request )) { // The request has been cancelled before it was active
173
- LOG .trace ("[{}] Removing cancelled request from the queue" , logPrefix );
174
- } else {
175
- nextRequest = onRequestDoneAndDequeNext ();
176
- }
171
+ if (!closed ) {
172
+ if (queue .remove (request )) { // The request has been cancelled before it was active
173
+ queueSize .decrementAndGet ();
174
+ LOG .trace ("[{}] Removing cancelled request from the queue" , logPrefix );
175
+ } else {
176
+ nextRequest = onRequestDoneAndDequeNext ();
177
177
}
178
- } finally {
179
- lock .unlock ();
180
178
}
181
179
182
180
if (nextRequest != null ) {
183
181
nextRequest .onThrottleReady (true );
184
182
}
185
183
}
186
184
187
- @ SuppressWarnings ("GuardedBy" ) // this method is only called with the lock held
188
185
@ Nullable
189
186
private Throttled onRequestDoneAndDequeNext () {
190
- assert lock .isHeldByCurrentThread ();
191
187
if (!closed ) {
192
- if (queue .isEmpty ()) {
193
- concurrentRequests -= 1 ;
188
+ Throttled nextRequest = queue .poll ();
189
+ if (nextRequest == null ) {
190
+ concurrentRequests .decrementAndGet ();
194
191
} else {
192
+ queueSize .decrementAndGet ();
195
193
LOG .trace ("[{}] Starting dequeued request" , logPrefix );
196
- // don't touch concurrentRequests since we finished one but started another
197
- return queue .poll ();
194
+ return nextRequest ;
198
195
}
199
196
}
200
197
@@ -204,45 +201,28 @@ private Throttled onRequestDoneAndDequeNext() {
204
201
205
202
@ Override
206
203
public void close () {
207
- lock .lock ();
208
- try {
209
- closed = true ;
210
- LOG .debug ("[{}] Rejecting {} queued requests after shutdown" , logPrefix , queue .size ());
211
- for (Throttled request : queue ) {
212
- fail (request , "The session is shutting down" );
213
- }
214
- } finally {
215
- lock .unlock ();
204
+ closed = true ;
205
+
206
+ LOG .debug ("[{}] Rejecting {} queued requests after shutdown" , logPrefix , queueSize .get ());
207
+ Throttled request ;
208
+ while ((request = queue .poll ()) != null ) {
209
+ queueSize .decrementAndGet ();
210
+ fail (request , "The session is shutting down" );
216
211
}
217
212
}
218
213
219
214
public int getQueueSize () {
220
- lock .lock ();
221
- try {
222
- return queue .size ();
223
- } finally {
224
- lock .unlock ();
225
- }
215
+ return queueSize .get ();
226
216
}
227
217
228
218
@ VisibleForTesting
229
219
int getConcurrentRequests () {
230
- lock .lock ();
231
- try {
232
- return concurrentRequests ;
233
- } finally {
234
- lock .unlock ();
235
- }
220
+ return concurrentRequests .get ();
236
221
}
237
222
238
223
@ VisibleForTesting
239
224
Deque <Throttled > getQueue () {
240
- lock .lock ();
241
- try {
242
- return queue ;
243
- } finally {
244
- lock .unlock ();
245
- }
225
+ return queue ;
246
226
}
247
227
248
228
private static void fail (Throttled request , String message ) {
0 commit comments