Skip to content

Commit 6d3ba47

Browse files
jasonk000tolbertam
authored andcommitted
Reduce lock held duration in ConcurrencyLimitingRequestThrottler
It might take some (small) time for callback handling when the throttler request proceeds to submission. Before this change, the throttler proceed request will happen while holding the lock, preventing other tasks from proceeding when there is spare capacity and even preventing tasks from enqueuing until the callback completes. By tracking the expected outcome, we can perform the callback outside of the lock. This means that request registration and submission can proceed even when a long callback is being processed. patch by Jason Koch; Reviewed by Andy Tolbert and Chris Lohfink for CASSANDRA-19922
1 parent 4fb5108 commit 6d3ba47

File tree

4 files changed

+206
-36
lines changed

4 files changed

+206
-36
lines changed

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
2626
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
2727
import edu.umd.cs.findbugs.annotations.NonNull;
28+
import edu.umd.cs.findbugs.annotations.Nullable;
2829
import java.util.ArrayDeque;
2930
import java.util.Deque;
3031
import java.util.concurrent.locks.ReentrantLock;
@@ -87,6 +88,8 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {
8788

8889
@Override
8990
public void register(@NonNull Throttled request) {
91+
boolean notifyReadyRequired = false;
92+
9093
lock.lock();
9194
try {
9295
if (closed) {
@@ -96,7 +99,7 @@ public void register(@NonNull Throttled request) {
9699
// We have capacity for one more concurrent request
97100
LOG.trace("[{}] Starting newly registered request", logPrefix);
98101
concurrentRequests += 1;
99-
request.onThrottleReady(false);
102+
notifyReadyRequired = true;
100103
} else if (queue.size() < maxQueueSize) {
101104
LOG.trace("[{}] Enqueuing request", logPrefix);
102105
queue.add(request);
@@ -112,16 +115,26 @@ public void register(@NonNull Throttled request) {
112115
} finally {
113116
lock.unlock();
114117
}
118+
119+
// no need to hold the lock while allowing the task to progress
120+
if (notifyReadyRequired) {
121+
request.onThrottleReady(false);
122+
}
115123
}
116124

117125
@Override
118126
public void signalSuccess(@NonNull Throttled request) {
127+
Throttled nextRequest = null;
119128
lock.lock();
120129
try {
121-
onRequestDone();
130+
nextRequest = onRequestDoneAndDequeNext();
122131
} finally {
123132
lock.unlock();
124133
}
134+
135+
if (nextRequest != null) {
136+
nextRequest.onThrottleReady(true);
137+
}
125138
}
126139

127140
@Override
@@ -131,48 +144,62 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
131144

132145
@Override
133146
public void signalTimeout(@NonNull Throttled request) {
147+
Throttled nextRequest = null;
134148
lock.lock();
135149
try {
136150
if (!closed) {
137151
if (queue.remove(request)) { // The request timed out before it was active
138152
LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
139153
} else {
140-
onRequestDone();
154+
nextRequest = onRequestDoneAndDequeNext();
141155
}
142156
}
143157
} finally {
144158
lock.unlock();
145159
}
160+
161+
if (nextRequest != null) {
162+
nextRequest.onThrottleReady(true);
163+
}
146164
}
147165

148166
@Override
149167
public void signalCancel(@NonNull Throttled request) {
168+
Throttled nextRequest = null;
150169
lock.lock();
151170
try {
152171
if (!closed) {
153172
if (queue.remove(request)) { // The request has been cancelled before it was active
154173
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
155174
} else {
156-
onRequestDone();
175+
nextRequest = onRequestDoneAndDequeNext();
157176
}
158177
}
159178
} finally {
160179
lock.unlock();
161180
}
181+
182+
if (nextRequest != null) {
183+
nextRequest.onThrottleReady(true);
184+
}
162185
}
163186

164187
@SuppressWarnings("GuardedBy") // this method is only called with the lock held
165-
private void onRequestDone() {
188+
@Nullable
189+
private Throttled onRequestDoneAndDequeNext() {
166190
assert lock.isHeldByCurrentThread();
167191
if (!closed) {
168192
if (queue.isEmpty()) {
169193
concurrentRequests -= 1;
170194
} else {
171195
LOG.trace("[{}] Starting dequeued request", logPrefix);
172-
queue.poll().onThrottleReady(true);
173196
// don't touch concurrentRequests since we finished one but started another
197+
return queue.poll();
174198
}
175199
}
200+
201+
// no next task was dequeued
202+
return null;
176203
}
177204

178205
@Override

Diff for: core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java

+131-12
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
3030
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
3131
import java.util.List;
32+
import java.util.concurrent.CountDownLatch;
3233
import java.util.function.Consumer;
3334
import org.junit.Before;
3435
import org.junit.Test;
@@ -67,7 +68,7 @@ public void should_start_immediately_when_under_capacity() {
6768
throttler.register(request);
6869

6970
// Then
70-
assertThatStage(request.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
71+
assertThatStage(request.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
7172
assertThat(throttler.getConcurrentRequests()).isEqualTo(1);
7273
assertThat(throttler.getQueue()).isEmpty();
7374
}
@@ -98,7 +99,7 @@ private void should_allow_new_request_when_active_one_completes(
9899
// Given
99100
MockThrottled first = new MockThrottled();
100101
throttler.register(first);
101-
assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
102+
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
102103
for (int i = 0; i < 4; i++) { // fill to capacity
103104
throttler.register(new MockThrottled());
104105
}
@@ -113,7 +114,7 @@ private void should_allow_new_request_when_active_one_completes(
113114
throttler.register(incoming);
114115

115116
// Then
116-
assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
117+
assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
117118
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
118119
assertThat(throttler.getQueue()).isEmpty();
119120
}
@@ -132,7 +133,7 @@ public void should_enqueue_when_over_capacity() {
132133
throttler.register(incoming);
133134

134135
// Then
135-
assertThatStage(incoming.started).isNotDone();
136+
assertThatStage(incoming.ended).isNotDone();
136137
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
137138
assertThat(throttler.getQueue()).containsExactly(incoming);
138139
}
@@ -157,20 +158,20 @@ private void should_dequeue_when_active_completes(Consumer<Throttled> completeCa
157158
// Given
158159
MockThrottled first = new MockThrottled();
159160
throttler.register(first);
160-
assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
161+
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
161162
for (int i = 0; i < 4; i++) {
162163
throttler.register(new MockThrottled());
163164
}
164165

165166
MockThrottled incoming = new MockThrottled();
166167
throttler.register(incoming);
167-
assertThatStage(incoming.started).isNotDone();
168+
assertThatStage(incoming.ended).isNotDone();
168169

169170
// When
170171
completeCallback.accept(first);
171172

172173
// Then
173-
assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue());
174+
assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue());
174175
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
175176
assertThat(throttler.getQueue()).isEmpty();
176177
}
@@ -189,7 +190,7 @@ public void should_reject_when_queue_is_full() {
189190
throttler.register(incoming);
190191

191192
// Then
192-
assertThatStage(incoming.started)
193+
assertThatStage(incoming.ended)
193194
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
194195
}
195196

@@ -208,7 +209,7 @@ public void should_remove_timed_out_request_from_queue() {
208209
throttler.signalTimeout(queued1);
209210

210211
// Then
211-
assertThatStage(queued2.started).isNotDone();
212+
assertThatStage(queued2.ended).isNotDone();
212213
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
213214
assertThat(throttler.getQueue()).hasSize(1);
214215
}
@@ -223,7 +224,7 @@ public void should_reject_enqueued_when_closing() {
223224
for (int i = 0; i < 10; i++) {
224225
MockThrottled request = new MockThrottled();
225226
throttler.register(request);
226-
assertThatStage(request.started).isNotDone();
227+
assertThatStage(request.ended).isNotDone();
227228
enqueued.add(request);
228229
}
229230

@@ -232,7 +233,7 @@ public void should_reject_enqueued_when_closing() {
232233

233234
// Then
234235
for (MockThrottled request : enqueued) {
235-
assertThatStage(request.started)
236+
assertThatStage(request.ended)
236237
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
237238
}
238239

@@ -241,7 +242,125 @@ public void should_reject_enqueued_when_closing() {
241242
throttler.register(request);
242243

243244
// Then
244-
assertThatStage(request.started)
245+
assertThatStage(request.ended)
245246
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
246247
}
248+
249+
@Test
250+
public void should_run_throttle_callbacks_concurrently() throws InterruptedException {
251+
// Given
252+
253+
// a task is enqueued, which when in onThrottleReady, will stall latch countDown()ed
254+
// register() should automatically start onThrottleReady on same thread
255+
256+
// start a parallel thread
257+
CountDownLatch firstRelease = new CountDownLatch(1);
258+
MockThrottled first = new MockThrottled(firstRelease);
259+
Runnable r =
260+
() -> {
261+
throttler.register(first);
262+
first.ended.toCompletableFuture().thenRun(() -> throttler.signalSuccess(first));
263+
};
264+
Thread t = new Thread(r);
265+
t.start();
266+
267+
// wait for the registration threads to reach await state
268+
assertThatStage(first.started).isSuccess();
269+
assertThatStage(first.ended).isNotDone();
270+
271+
// When
272+
// we concurrently submit a second shorter task
273+
MockThrottled second = new MockThrottled();
274+
// (on a second thread, so that we can join and force a timeout in case
275+
// registration is delayed)
276+
Thread t2 = new Thread(() -> throttler.register(second));
277+
t2.start();
278+
t2.join(1_000);
279+
280+
// Then
281+
// registration will trigger callback, should complete ~immediately
282+
assertThatStage(second.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
283+
// first should still be unfinished
284+
assertThatStage(first.started).isDone();
285+
assertThatStage(first.ended).isNotDone();
286+
// now finish, and verify
287+
firstRelease.countDown();
288+
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
289+
290+
t.join(1_000);
291+
}
292+
293+
@Test
294+
public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws InterruptedException {
295+
// Given
296+
297+
// Multiple tasks are registered, up to the limit, and proceed into their
298+
// callback
299+
300+
// start five parallel threads
301+
final int THREADS = 5;
302+
Thread[] threads = new Thread[THREADS];
303+
CountDownLatch[] latches = new CountDownLatch[THREADS];
304+
MockThrottled[] throttled = new MockThrottled[THREADS];
305+
for (int i = 0; i < threads.length; i++) {
306+
latches[i] = new CountDownLatch(1);
307+
final MockThrottled itThrottled = new MockThrottled(latches[i]);
308+
throttled[i] = itThrottled;
309+
threads[i] =
310+
new Thread(
311+
() -> {
312+
throttler.register(itThrottled);
313+
itThrottled
314+
.ended
315+
.toCompletableFuture()
316+
.thenRun(() -> throttler.signalSuccess(itThrottled));
317+
});
318+
threads[i].start();
319+
}
320+
321+
// wait for the registration threads to be launched
322+
// they are all waiting now
323+
for (int i = 0; i < throttled.length; i++) {
324+
assertThatStage(throttled[i].started).isSuccess();
325+
assertThatStage(throttled[i].ended).isNotDone();
326+
}
327+
328+
// When
329+
// we concurrently submit another task
330+
MockThrottled last = new MockThrottled();
331+
throttler.register(last);
332+
333+
// Then
334+
// registration will enqueue the callback, and it should not
335+
// take any time to proceed (ie: we should not be blocked)
336+
// and there should be an element in the queue
337+
assertThatStage(last.started).isNotDone();
338+
assertThatStage(last.ended).isNotDone();
339+
assertThat(throttler.getQueue()).containsExactly(last);
340+
341+
// we still have not released, so old throttled threads should be waiting
342+
for (int i = 0; i < throttled.length; i++) {
343+
assertThatStage(throttled[i].started).isDone();
344+
assertThatStage(throttled[i].ended).isNotDone();
345+
}
346+
347+
// now let us release ..
348+
for (int i = 0; i < latches.length; i++) {
349+
latches[i].countDown();
350+
}
351+
352+
// .. and check everything finished up OK
353+
for (int i = 0; i < latches.length; i++) {
354+
assertThatStage(throttled[i].started).isSuccess();
355+
assertThatStage(throttled[i].ended).isSuccess();
356+
}
357+
358+
// for good measure, we will also wait for the enqueued to complete
359+
assertThatStage(last.started).isSuccess();
360+
assertThatStage(last.ended).isSuccess();
361+
362+
for (int i = 0; i < threads.length; i++) {
363+
threads[i].join(1_000);
364+
}
365+
}
247366
}

0 commit comments

Comments
 (0)