diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java index 438bed0953b..ffe0ffe9650 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java @@ -25,6 +25,7 @@ import com.datastax.oss.driver.api.core.session.throttling.Throttled; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.locks.ReentrantLock; @@ -87,6 +88,8 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) { @Override public void register(@NonNull Throttled request) { + boolean notifyReadyRequired = false; + lock.lock(); try { if (closed) { @@ -96,7 +99,7 @@ public void register(@NonNull Throttled request) { // We have capacity for one more concurrent request LOG.trace("[{}] Starting newly registered request", logPrefix); concurrentRequests += 1; - request.onThrottleReady(false); + notifyReadyRequired = true; } else if (queue.size() < maxQueueSize) { LOG.trace("[{}] Enqueuing request", logPrefix); queue.add(request); @@ -112,16 +115,26 @@ public void register(@NonNull Throttled request) { } finally { lock.unlock(); } + + // no need to hold the lock while allowing the task to progress + if (notifyReadyRequired) { + request.onThrottleReady(false); + } } @Override public void signalSuccess(@NonNull Throttled request) { + Throttled nextRequest = null; lock.lock(); try { - onRequestDone(); + nextRequest = onRequestDoneAndDequeNext(); } finally { lock.unlock(); } + + if (nextRequest != null) { + nextRequest.onThrottleReady(true); + } } @Override @@ -131,48 +144,62 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) { @Override public void signalTimeout(@NonNull Throttled request) { + Throttled nextRequest = null; lock.lock(); try { if (!closed) { if (queue.remove(request)) { // The request timed out before it was active LOG.trace("[{}] Removing timed out request from the queue", logPrefix); } else { - onRequestDone(); + nextRequest = onRequestDoneAndDequeNext(); } } } finally { lock.unlock(); } + + if (nextRequest != null) { + nextRequest.onThrottleReady(true); + } } @Override public void signalCancel(@NonNull Throttled request) { + Throttled nextRequest = null; lock.lock(); try { if (!closed) { if (queue.remove(request)) { // The request has been cancelled before it was active LOG.trace("[{}] Removing cancelled request from the queue", logPrefix); } else { - onRequestDone(); + nextRequest = onRequestDoneAndDequeNext(); } } } finally { lock.unlock(); } + + if (nextRequest != null) { + nextRequest.onThrottleReady(true); + } } @SuppressWarnings("GuardedBy") // this method is only called with the lock held - private void onRequestDone() { + @Nullable + private Throttled onRequestDoneAndDequeNext() { assert lock.isHeldByCurrentThread(); if (!closed) { if (queue.isEmpty()) { concurrentRequests -= 1; } else { LOG.trace("[{}] Starting dequeued request", logPrefix); - queue.poll().onThrottleReady(true); // don't touch concurrentRequests since we finished one but started another + return queue.poll(); } } + + // no next task was dequeued + return null; } @Override diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java index c01b26c1e9f..7eb682070cd 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java @@ -29,6 +29,7 @@ import com.datastax.oss.driver.api.core.session.throttling.Throttled; import com.datastax.oss.driver.shaded.guava.common.collect.Lists; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; import org.junit.Before; import org.junit.Test; @@ -67,7 +68,7 @@ public void should_start_immediately_when_under_capacity() { throttler.register(request); // Then - assertThatStage(request.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(request.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); assertThat(throttler.getConcurrentRequests()).isEqualTo(1); assertThat(throttler.getQueue()).isEmpty(); } @@ -98,7 +99,7 @@ private void should_allow_new_request_when_active_one_completes( // Given MockThrottled first = new MockThrottled(); throttler.register(first); - assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); for (int i = 0; i < 4; i++) { // fill to capacity throttler.register(new MockThrottled()); } @@ -113,7 +114,7 @@ private void should_allow_new_request_when_active_one_completes( throttler.register(incoming); // Then - assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); assertThat(throttler.getConcurrentRequests()).isEqualTo(5); assertThat(throttler.getQueue()).isEmpty(); } @@ -132,7 +133,7 @@ public void should_enqueue_when_over_capacity() { throttler.register(incoming); // Then - assertThatStage(incoming.started).isNotDone(); + assertThatStage(incoming.ended).isNotDone(); assertThat(throttler.getConcurrentRequests()).isEqualTo(5); assertThat(throttler.getQueue()).containsExactly(incoming); } @@ -157,20 +158,20 @@ private void should_dequeue_when_active_completes(Consumer completeCa // Given MockThrottled first = new MockThrottled(); throttler.register(first); - assertThatStage(first.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); for (int i = 0; i < 4; i++) { throttler.register(new MockThrottled()); } MockThrottled incoming = new MockThrottled(); throttler.register(incoming); - assertThatStage(incoming.started).isNotDone(); + assertThatStage(incoming.ended).isNotDone(); // When completeCallback.accept(first); // Then - assertThatStage(incoming.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); + assertThatStage(incoming.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); assertThat(throttler.getConcurrentRequests()).isEqualTo(5); assertThat(throttler.getQueue()).isEmpty(); } @@ -189,7 +190,7 @@ public void should_reject_when_queue_is_full() { throttler.register(incoming); // Then - assertThatStage(incoming.started) + assertThatStage(incoming.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } @@ -208,7 +209,7 @@ public void should_remove_timed_out_request_from_queue() { throttler.signalTimeout(queued1); // Then - assertThatStage(queued2.started).isNotDone(); + assertThatStage(queued2.ended).isNotDone(); assertThat(throttler.getConcurrentRequests()).isEqualTo(5); assertThat(throttler.getQueue()).hasSize(1); } @@ -223,7 +224,7 @@ public void should_reject_enqueued_when_closing() { for (int i = 0; i < 10; i++) { MockThrottled request = new MockThrottled(); throttler.register(request); - assertThatStage(request.started).isNotDone(); + assertThatStage(request.ended).isNotDone(); enqueued.add(request); } @@ -232,7 +233,7 @@ public void should_reject_enqueued_when_closing() { // Then for (MockThrottled request : enqueued) { - assertThatStage(request.started) + assertThatStage(request.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } @@ -241,7 +242,125 @@ public void should_reject_enqueued_when_closing() { throttler.register(request); // Then - assertThatStage(request.started) + assertThatStage(request.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } + + @Test + public void should_run_throttle_callbacks_concurrently() throws InterruptedException { + // Given + + // a task is enqueued, which when in onThrottleReady, will stall latch countDown()ed + // register() should automatically start onThrottleReady on same thread + + // start a parallel thread + CountDownLatch firstRelease = new CountDownLatch(1); + MockThrottled first = new MockThrottled(firstRelease); + Runnable r = + () -> { + throttler.register(first); + first.ended.toCompletableFuture().thenRun(() -> throttler.signalSuccess(first)); + }; + Thread t = new Thread(r); + t.start(); + + // wait for the registration threads to reach await state + assertThatStage(first.started).isSuccess(); + assertThatStage(first.ended).isNotDone(); + + // When + // we concurrently submit a second shorter task + MockThrottled second = new MockThrottled(); + // (on a second thread, so that we can join and force a timeout in case + // registration is delayed) + Thread t2 = new Thread(() -> throttler.register(second)); + t2.start(); + t2.join(1_000); + + // Then + // registration will trigger callback, should complete ~immediately + assertThatStage(second.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + // first should still be unfinished + assertThatStage(first.started).isDone(); + assertThatStage(first.ended).isNotDone(); + // now finish, and verify + firstRelease.countDown(); + assertThatStage(first.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + + t.join(1_000); + } + + @Test + public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws InterruptedException { + // Given + + // Multiple tasks are registered, up to the limit, and proceed into their + // callback + + // start five parallel threads + final int THREADS = 5; + Thread[] threads = new Thread[THREADS]; + CountDownLatch[] latches = new CountDownLatch[THREADS]; + MockThrottled[] throttled = new MockThrottled[THREADS]; + for (int i = 0; i < threads.length; i++) { + latches[i] = new CountDownLatch(1); + final MockThrottled itThrottled = new MockThrottled(latches[i]); + throttled[i] = itThrottled; + threads[i] = + new Thread( + () -> { + throttler.register(itThrottled); + itThrottled + .ended + .toCompletableFuture() + .thenRun(() -> throttler.signalSuccess(itThrottled)); + }); + threads[i].start(); + } + + // wait for the registration threads to be launched + // they are all waiting now + for (int i = 0; i < throttled.length; i++) { + assertThatStage(throttled[i].started).isSuccess(); + assertThatStage(throttled[i].ended).isNotDone(); + } + + // When + // we concurrently submit another task + MockThrottled last = new MockThrottled(); + throttler.register(last); + + // Then + // registration will enqueue the callback, and it should not + // take any time to proceed (ie: we should not be blocked) + // and there should be an element in the queue + assertThatStage(last.started).isNotDone(); + assertThatStage(last.ended).isNotDone(); + assertThat(throttler.getQueue()).containsExactly(last); + + // we still have not released, so old throttled threads should be waiting + for (int i = 0; i < throttled.length; i++) { + assertThatStage(throttled[i].started).isDone(); + assertThatStage(throttled[i].ended).isNotDone(); + } + + // now let us release .. + for (int i = 0; i < latches.length; i++) { + latches[i].countDown(); + } + + // .. and check everything finished up OK + for (int i = 0; i < latches.length; i++) { + assertThatStage(throttled[i].started).isSuccess(); + assertThatStage(throttled[i].ended).isSuccess(); + } + + // for good measure, we will also wait for the enqueued to complete + assertThatStage(last.started).isSuccess(); + assertThatStage(last.ended).isSuccess(); + + for (int i = 0; i < threads.length; i++) { + threads[i].join(1_000); + } + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java index b7cd0ee8a54..9e54e3d511f 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java @@ -19,21 +19,45 @@ import com.datastax.oss.driver.api.core.RequestThrottlingException; import com.datastax.oss.driver.api.core.session.throttling.Throttled; +import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; class MockThrottled implements Throttled { + final CompletionStage started = new CompletableFuture<>(); + final CompletionStage ended = new CompletableFuture<>(); + final CountDownLatch canRelease; - final CompletionStage started = new CompletableFuture<>(); + public MockThrottled() { + this(new CountDownLatch(0)); + } + + /* + * The releaseLatch can be provided to add some delay before the + * task readiness/fail callbacks complete. This can be used, eg, to + * imitate a slow callback. + */ + public MockThrottled(CountDownLatch releaseLatch) { + this.canRelease = releaseLatch; + } @Override public void onThrottleReady(boolean wasDelayed) { - started.toCompletableFuture().complete(wasDelayed); + started.toCompletableFuture().complete(null); + awaitRelease(); + ended.toCompletableFuture().complete(wasDelayed); } @Override public void onThrottleFailure(@NonNull RequestThrottlingException error) { - started.toCompletableFuture().completeExceptionally(error); + started.toCompletableFuture().complete(null); + awaitRelease(); + ended.toCompletableFuture().completeExceptionally(error); + } + + private void awaitRelease() { + Uninterruptibles.awaitUninterruptibly(canRelease); } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java index 0e0fe7c1c65..1e15610bf7b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java @@ -98,7 +98,7 @@ public void should_start_immediately_when_under_capacity() { throttler.register(request); // Then - assertThatStage(request.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(request.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); assertThat(throttler.getStoredPermits()).isEqualTo(4); assertThat(throttler.getQueue()).isEmpty(); } @@ -117,7 +117,7 @@ public void should_allow_new_request_when_under_rate() { throttler.register(request); // Then - assertThatStage(request.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); + assertThatStage(request.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isFalse()); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).isEmpty(); } @@ -136,7 +136,7 @@ public void should_enqueue_when_over_rate() { throttler.register(request); // Then - assertThatStage(request.started).isNotDone(); + assertThatStage(request.ended).isNotDone(); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).containsExactly(request); @@ -160,7 +160,7 @@ public void should_reject_when_queue_is_full() { throttler.register(request); // Then - assertThatStage(request.started) + assertThatStage(request.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } @@ -188,7 +188,7 @@ private void testRemoveInvalidEventFromQueue(Consumer completeCallbac completeCallback.accept(queued1); // Then - assertThatStage(queued2.started).isNotDone(); + assertThatStage(queued2.ended).isNotDone(); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).containsExactly(queued2); } @@ -202,10 +202,10 @@ public void should_dequeue_when_draining_task_runs() { MockThrottled queued1 = new MockThrottled(); throttler.register(queued1); - assertThatStage(queued1.started).isNotDone(); + assertThatStage(queued1.ended).isNotDone(); MockThrottled queued2 = new MockThrottled(); throttler.register(queued2); - assertThatStage(queued2.started).isNotDone(); + assertThatStage(queued2.ended).isNotDone(); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).hasSize(2); @@ -230,8 +230,8 @@ public void should_dequeue_when_draining_task_runs() { task.run(); // Then - assertThatStage(queued1.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); - assertThatStage(queued2.started).isNotDone(); + assertThatStage(queued1.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); + assertThatStage(queued2.ended).isNotDone(); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).containsExactly(queued2); // task reschedules itself since it did not empty the queue @@ -244,7 +244,7 @@ public void should_dequeue_when_draining_task_runs() { task.run(); // Then - assertThatStage(queued2.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); + assertThatStage(queued2.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); assertThat(throttler.getStoredPermits()).isEqualTo(0); assertThat(throttler.getQueue()).isEmpty(); assertThat(adminExecutor.nextTask()).isNull(); @@ -286,14 +286,14 @@ public void should_keep_accumulating_time_if_no_permits_created() { // Then MockThrottled queued = new MockThrottled(); throttler.register(queued); - assertThatStage(queued.started).isNotDone(); + assertThatStage(queued.ended).isNotDone(); // When clock.add(ONE_HUNDRED_MILLISECONDS); adminExecutor.nextTask().run(); // Then - assertThatStage(queued.started).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); + assertThatStage(queued.ended).isSuccess(wasDelayed -> assertThat(wasDelayed).isTrue()); } @Test @@ -306,7 +306,7 @@ public void should_reject_enqueued_when_closing() { for (int i = 0; i < 10; i++) { MockThrottled request = new MockThrottled(); throttler.register(request); - assertThatStage(request.started).isNotDone(); + assertThatStage(request.ended).isNotDone(); enqueued.add(request); } @@ -315,7 +315,7 @@ public void should_reject_enqueued_when_closing() { // Then for (MockThrottled request : enqueued) { - assertThatStage(request.started) + assertThatStage(request.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } @@ -324,7 +324,7 @@ public void should_reject_enqueued_when_closing() { throttler.register(request); // Then - assertThatStage(request.started) + assertThatStage(request.ended) .isFailed(error -> assertThat(error).isInstanceOf(RequestThrottlingException.class)); } }