Skip to content

Reduce lock held duration in ConcurrencyLimitingRequestThrottler #1957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -87,6 +88,8 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {

@Override
public void register(@NonNull Throttled request) {
boolean notifyReadyRequired = false;

lock.lock();
try {
if (closed) {
Expand All @@ -96,7 +99,7 @@ public void register(@NonNull Throttled request) {
// We have capacity for one more concurrent request
LOG.trace("[{}] Starting newly registered request", logPrefix);
concurrentRequests += 1;
request.onThrottleReady(false);
notifyReadyRequired = true;
} else if (queue.size() < maxQueueSize) {
LOG.trace("[{}] Enqueuing request", logPrefix);
queue.add(request);
Expand All @@ -112,16 +115,26 @@ public void register(@NonNull Throttled request) {
} finally {
lock.unlock();
}

// no need to hold the lock while allowing the task to progress
if (notifyReadyRequired) {
request.onThrottleReady(false);
}
}

@Override
public void signalSuccess(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
onRequestDone();
nextRequest = onRequestDoneAndDequeNext();
} finally {
lock.unlock();
}

if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
}

@Override
Expand All @@ -131,48 +144,62 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {

@Override
public void signalTimeout(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
if (!closed) {
if (queue.remove(request)) { // The request timed out before it was active
LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
} else {
onRequestDone();
nextRequest = onRequestDoneAndDequeNext();
}
}
} finally {
lock.unlock();
}

if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
}

@Override
public void signalCancel(@NonNull Throttled request) {
Throttled nextRequest = null;
lock.lock();
try {
if (!closed) {
if (queue.remove(request)) { // The request has been cancelled before it was active
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
} else {
onRequestDone();
nextRequest = onRequestDoneAndDequeNext();
}
}
} finally {
lock.unlock();
}

if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
}

@SuppressWarnings("GuardedBy") // this method is only called with the lock held
private void onRequestDone() {
@Nullable
private Throttled onRequestDoneAndDequeNext() {
assert lock.isHeldByCurrentThread();
if (!closed) {
if (queue.isEmpty()) {
concurrentRequests -= 1;
} else {
LOG.trace("[{}] Starting dequeued request", logPrefix);
queue.poll().onThrottleReady(true);
// don't touch concurrentRequests since we finished one but started another
return queue.poll();
}
}

// no next task was dequeued
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void should_start_immediately_when_under_capacity() {
throttler.register(request);

// Then
assertThatStage(request.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThatStage(request.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThat(throttler.getConcurrentRequests()).isEqualTo(1);
assertThat(throttler.getQueue()).isEmpty();
}
Expand Down Expand Up @@ -98,7 +99,7 @@ private void should_allow_new_request_when_active_one_completes(
// Given
MockThrottled first = new MockThrottled();
throttler.register(first);
assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
for (int i = 0; i < 4; i++) { // fill to capacity
throttler.register(new MockThrottled());
}
Expand All @@ -113,7 +114,7 @@ private void should_allow_new_request_when_active_one_completes(
throttler.register(incoming);

// Then
assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
assertThat(throttler.getQueue()).isEmpty();
}
Expand All @@ -132,7 +133,7 @@ public void should_enqueue_when_over_capacity() {
throttler.register(incoming);

// Then
assertThatStage(incoming.started).isNotDone();
assertThatStage(incoming.ended).isNotDone();
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
assertThat(throttler.getQueue()).containsExactly(incoming);
}
Expand All @@ -157,20 +158,20 @@ private void should_dequeue_when_active_completes(Consumer<Throttled> completeCa
// Given
MockThrottled first = new MockThrottled();
throttler.register(first);
assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
for (int i = 0; i < 4; i++) {
throttler.register(new MockThrottled());
}

MockThrottled incoming = new MockThrottled();
throttler.register(incoming);
assertThatStage(incoming.started).isNotDone();
assertThatStage(incoming.ended).isNotDone();

// When
completeCallback.accept(first);

// Then
assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue());
assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue());
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
assertThat(throttler.getQueue()).isEmpty();
}
Expand All @@ -189,7 +190,7 @@ public void should_reject_when_queue_is_full() {
throttler.register(incoming);

// Then
assertThatStage(incoming.started)
assertThatStage(incoming.ended)
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
}

Expand All @@ -208,7 +209,7 @@ public void should_remove_timed_out_request_from_queue() {
throttler.signalTimeout(queued1);

// Then
assertThatStage(queued2.started).isNotDone();
assertThatStage(queued2.ended).isNotDone();
assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
assertThat(throttler.getQueue()).hasSize(1);
}
Expand All @@ -223,7 +224,7 @@ public void should_reject_enqueued_when_closing() {
for (int i = 0; i < 10; i++) {
MockThrottled request = new MockThrottled();
throttler.register(request);
assertThatStage(request.started).isNotDone();
assertThatStage(request.ended).isNotDone();
enqueued.add(request);
}

Expand All @@ -232,7 +233,7 @@ public void should_reject_enqueued_when_closing() {

// Then
for (MockThrottled request : enqueued) {
assertThatStage(request.started)
assertThatStage(request.ended)
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
}

Expand All @@ -241,7 +242,125 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);

// Then
assertThatStage(request.started)
assertThatStage(request.ended)
.isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class));
}

@Test
public void should_run_throttle_callbacks_concurrently() throws InterruptedException {
// Given

// a task is enqueued, which when in onThrottleReady, will stall latch countDown()ed
// register() should automatically start onThrottleReady on same thread

// start a parallel thread
CountDownLatch firstRelease = new CountDownLatch(1);
MockThrottled first = new MockThrottled(firstRelease);
Runnable r =
() -> {
throttler.register(first);
first.ended.toCompletableFuture().thenRun(() -> throttler.signalSuccess(first));
};
Thread t = new Thread(r);
t.start();

// wait for the registration threads to reach await state
assertThatStage(first.started).isSuccess();
assertThatStage(first.ended).isNotDone();

// When
// we concurrently submit a second shorter task
MockThrottled second = new MockThrottled();
// (on a second thread, so that we can join and force a timeout in case
// registration is delayed)
Thread t2 = new Thread(() -> throttler.register(second));
t2.start();
t2.join(1_000);

// Then
// registration will trigger callback, should complete ~immediately
assertThatStage(second.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());
// first should still be unfinished
assertThatStage(first.started).isDone();
assertThatStage(first.ended).isNotDone();
// now finish, and verify
firstRelease.countDown();
assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse());

t.join(1_000);
}

@Test
public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws InterruptedException {
// Given

// Multiple tasks are registered, up to the limit, and proceed into their
// callback

// start five parallel threads
final int THREADS = 5;
Thread[] threads = new Thread[THREADS];
CountDownLatch[] latches = new CountDownLatch[THREADS];
MockThrottled[] throttled = new MockThrottled[THREADS];
for (int i = 0; i < threads.length; i++) {
latches[i] = new CountDownLatch(1);
final MockThrottled itThrottled = new MockThrottled(latches[i]);
throttled[i] = itThrottled;
threads[i] =
new Thread(
() -> {
throttler.register(itThrottled);
itThrottled
.ended
.toCompletableFuture()
.thenRun(() -> throttler.signalSuccess(itThrottled));
});
threads[i].start();
}

// wait for the registration threads to be launched
// they are all waiting now
for (int i = 0; i < throttled.length; i++) {
assertThatStage(throttled[i].started).isSuccess();
assertThatStage(throttled[i].ended).isNotDone();
}

// When
// we concurrently submit another task
MockThrottled last = new MockThrottled();
throttler.register(last);

// Then
// registration will enqueue the callback, and it should not
// take any time to proceed (ie: we should not be blocked)
// and there should be an element in the queue
assertThatStage(last.started).isNotDone();
assertThatStage(last.ended).isNotDone();
assertThat(throttler.getQueue()).containsExactly(last);

// we still have not released, so old throttled threads should be waiting
for (int i = 0; i < throttled.length; i++) {
assertThatStage(throttled[i].started).isDone();
assertThatStage(throttled[i].ended).isNotDone();
}

// now let us release ..
for (int i = 0; i < latches.length; i++) {
latches[i].countDown();
}

// .. and check everything finished up OK
for (int i = 0; i < latches.length; i++) {
assertThatStage(throttled[i].started).isSuccess();
assertThatStage(throttled[i].ended).isSuccess();
}

// for good measure, we will also wait for the enqueued to complete
assertThatStage(last.started).isSuccess();
assertThatStage(last.ended).isSuccess();

for (int i = 0; i < threads.length; i++) {
threads[i].join(1_000);
}
}
}
Loading