diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java index 5e087d3093b8b..c7345aa3b6368 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; import java.util.concurrent.ExecutionException; @@ -30,8 +31,14 @@ public class FutureUtils { + /** + * Cancel execution of this future without interrupting a running thread. See {@link Future#cancel(boolean)} for details. + * + * @param toCancel the future to cancel + * @return false if the future could not be cancelled, otherwise true + */ @SuppressForbidden(reason = "Future#cancel()") - public static boolean cancel(Future toCancel) { + public static boolean cancel(@Nullable final Future toCancel) { if (toCancel != null) { return toCancel.cancel(false); // this method is a forbidden API since it interrupts threads } diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 825a8a8a48354..df93a935b62f2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -21,13 +21,19 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -45,38 +51,43 @@ public class GlobalCheckpointListeners implements Closeable { public interface GlobalCheckpointListener { /** * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint - * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the - * global checkpoint is updated, the exception will be null. + * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an + * instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be + * non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null. * * @param globalCheckpoint the updated global checkpoint - * @param e if non-null, the shard is closed + * @param e if non-null, the shard is closed or the listener timed out */ - void accept(long globalCheckpoint, IndexShardClosedException e); + void accept(long globalCheckpoint, Exception e); } // guarded by this private boolean closed; - private volatile List listeners; + private volatile Map> listeners; private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO; private final ShardId shardId; private final Executor executor; + private final ScheduledExecutorService scheduler; private final Logger logger; /** * Construct a global checkpoint listeners collection. * - * @param shardId the shard ID on which global checkpoint updates can be listened to - * @param executor the executor for listener notifications - * @param logger a shard-level logger + * @param shardId the shard ID on which global checkpoint updates can be listened to + * @param executor the executor for listener notifications + * @param scheduler the executor used for scheduling timeouts + * @param logger a shard-level logger */ GlobalCheckpointListeners( final ShardId shardId, final Executor executor, + final ScheduledExecutorService scheduler, final Logger logger) { - this.shardId = Objects.requireNonNull(shardId); - this.executor = Objects.requireNonNull(executor); - this.logger = Objects.requireNonNull(logger); + this.shardId = Objects.requireNonNull(shardId, "shardId"); + this.executor = Objects.requireNonNull(executor, "executor"); + this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); + this.logger = Objects.requireNonNull(logger, "logger"); } /** @@ -84,12 +95,15 @@ public interface GlobalCheckpointListener { * listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard - * is closed. A listener must re-register after one of these events to receive subsequent events. + * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be + * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for + * the timeout means no timeout will be associated to the listener. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener + * @param timeout the listener timeout, or null if no timeout */ - synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) { + synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener, final TimeValue timeout) { if (closed) { executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId))); return; @@ -99,9 +113,41 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null)); } else { if (listeners == null) { - listeners = new ArrayList<>(); + listeners = new LinkedHashMap<>(); + } + if (timeout == null) { + listeners.put(listener, null); + } else { + listeners.put( + listener, + scheduler.schedule( + () -> { + final boolean removed; + synchronized (this) { + /* + * Note that the listeners map can be null if a notification nulled out the map reference when + * notifying listeners, and then our scheduled execution occurred before we could be cancelled by + * the notification. In this case, we would have blocked waiting for access to this critical + * section. + * + * What is more, we know that this listener has a timeout associated with it (otherwise we would + * not be here) so the return value from remove being null is an indication that we are not in the + * map. This can happen if a notification nulled out the listeners, and then our scheduled execution + * occurred before we could be cancelled by the notification, and then another thread added a + * listener causing the listeners map reference to be non-null again. In this case, our listener + * here would not be in the map and we should not fire the timeout logic. + */ + removed = listeners != null && listeners.remove(listener) != null; + } + if (removed) { + final TimeoutException e = new TimeoutException(timeout.getStringRep()); + logger.trace("global checkpoint listener timed out", e); + executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); + } + }, + timeout.nanos(), + TimeUnit.NANOSECONDS)); } - listeners.add(listener); } } @@ -111,10 +157,25 @@ public synchronized void close() throws IOException { notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)); } + /** + * The number of listeners currently pending for notification. + * + * @return the number of listeners pending notification + */ synchronized int pendingListeners() { return listeners == null ? 0 : listeners.size(); } + /** + * The scheduled future for a listener that has a timeout associated with it, otherwise null. + * + * @param listener the listener to get the scheduled future for + * @return a scheduled future representing the timeout future for the listener, otherwise null + */ + synchronized ScheduledFuture getTimeoutFuture(final GlobalCheckpointListener listener) { + return listeners.get(listener); + } + /** * Invoke to notify all registered listeners of an updated global checkpoint. * @@ -134,19 +195,24 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null); if (listeners != null) { // capture the current listeners - final List currentListeners = listeners; + final Map> currentListeners = listeners; listeners = null; if (currentListeners != null) { executor.execute(() -> { - for (final GlobalCheckpointListener listener : currentListeners) { - notifyListener(listener, globalCheckpoint, e); + for (final Map.Entry> listener : currentListeners.entrySet()) { + /* + * We do not want to interrupt any timeouts that fired, these will detect that the listener has been notified and + * not trigger the timeout. + */ + FutureUtils.cancel(listener.getValue()); + notifyListener(listener.getKey(), globalCheckpoint, e); } }); } } } - private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) { + private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final Exception e) { try { listener.accept(globalCheckpoint, e); } catch (final Exception caught) { @@ -156,8 +222,11 @@ private void notifyListener(final GlobalCheckpointListener listener, final long "error notifying global checkpoint listener of updated global checkpoint [{}]", globalCheckpoint), caught); - } else { + } else if (e instanceof IndexShardClosedException) { logger.warn("error notifying global checkpoint listener of closed shard", caught); + } else { + assert e instanceof TimeoutException : e; + logger.warn("error notifying global checkpoint listener of timeout", caught); } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4bb56c8b0d3b5..91d87b0008214 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -302,7 +302,8 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); - this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); + this.globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger); this.replicationTracker = new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated); @@ -1781,15 +1782,18 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long /** * Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the - * listener will fire immediately on the calling thread. + * listener will fire immediately on the calling thread. If the specified timeout elapses before the listener is notified, the listener + * will be notified with an {@link TimeoutException}. A caller may pass null to specify no timeout. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener + * @param timeout the timeout */ public void addGlobalCheckpointListener( final long currentGlobalCheckpoint, - final GlobalCheckpointListeners.GlobalCheckpointListener listener) { - this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener); + final GlobalCheckpointListeners.GlobalCheckpointListener listener, + final TimeValue timeout) { + this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener, timeout); } /** diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/FutureUtilsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/FutureUtilsTests.java new file mode 100644 index 0000000000000..fb1265dd4d223 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/FutureUtilsTests.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.Future; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class FutureUtilsTests extends ESTestCase { + + public void testCancellingNullFutureOkay() { + FutureUtils.cancel(null); + } + + public void testRunningFutureNotInterrupted() { + final Future future = mock(Future.class); + FutureUtils.cancel(future); + verify(future).cancel(false); + } + +} \ No newline at end of file diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index bb3a691a702c8..e5e2453682fc8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -21,8 +21,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; +import org.junit.After; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -35,14 +39,20 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -50,10 +60,18 @@ public class GlobalCheckpointListenersTests extends ESTestCase { - final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + private final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); + private final ScheduledThreadPoolExecutor scheduler = + new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(Settings.EMPTY, "scheduler")); + + @After + public void shutdownScheduler() { + scheduler.shutdown(); + } public void testGlobalCheckpointUpdated() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final long[] globalCheckpoints = new long[numberOfListeners]; @@ -69,7 +87,7 @@ public void testGlobalCheckpointUpdated() throws IOException { assert e == null; globalCheckpoints[index] = g; }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -92,7 +110,8 @@ public void testGlobalCheckpointUpdated() throws IOException { } public void testListenersReadyToBeNotified() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); @@ -109,7 +128,7 @@ public void testListenersReadyToBeNotified() throws IOException { assert e == null; globalCheckpoints[index] = g; }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); // the listener should be notified immediately assertThat(globalCheckpoints[index], equalTo(globalCheckpoint)); } @@ -130,7 +149,8 @@ public void testListenersReadyToBeNotified() throws IOException { public void testFailingListenerReadyToBeNotified() { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED + 1, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final int numberOfListeners = randomIntBetween(0, 16); @@ -149,7 +169,7 @@ public void testFailingListenerReadyToBeNotified() { globalCheckpoints[index] = globalCheckpoint; } }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener); + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint - 1), listener, null); // the listener should be notified immediately if (failure) { assertThat(globalCheckpoints[i], equalTo(Long.MIN_VALUE)); @@ -172,10 +192,11 @@ public void testFailingListenerReadyToBeNotified() { } public void testClose() throws IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); - final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + final Exception[] exceptions = new Exception[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; final AtomicBoolean invoked = new AtomicBoolean(); @@ -188,12 +209,13 @@ public void testClose() throws IOException { assert e != null; exceptions[index] = e; }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { assertNotNull(exceptions[i]); - assertThat(exceptions[i].getShardId(), equalTo(shardId)); + assertThat(exceptions[i], instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException)exceptions[i]).getShardId(), equalTo(shardId)); } // test the listeners are not invoked twice @@ -207,7 +229,8 @@ public void testClose() throws IOException { } public void testAddAfterClose() throws InterruptedException, IOException { - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, logger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); globalCheckpointListeners.close(); final AtomicBoolean invoked = new AtomicBoolean(); @@ -221,14 +244,15 @@ public void testAddAfterClose() throws InterruptedException, IOException { } latch.countDown(); }; - globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener); + globalCheckpointListeners.add(randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE), listener, null); latch.await(); assertTrue(invoked.get()); } public void testFailingListenerOnUpdate() { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; @@ -248,7 +272,7 @@ public void testFailingListenerOnUpdate() { globalCheckpoints[index] = g; } }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); } final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); @@ -282,11 +306,12 @@ public void testFailingListenerOnUpdate() { public void testFailingListenerOnClose() throws IOException { final Logger mockLogger = mock(Logger.class); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, Runnable::run, mockLogger); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final int numberOfListeners = randomIntBetween(0, 16); final boolean[] failures = new boolean[numberOfListeners]; - final IndexShardClosedException[] exceptions = new IndexShardClosedException[numberOfListeners]; + final Exception[] exceptions = new Exception[numberOfListeners]; for (int i = 0; i < numberOfListeners; i++) { final int index = i; final boolean failure = randomBoolean(); @@ -301,7 +326,7 @@ public void testFailingListenerOnClose() throws IOException { exceptions[index] = e; } }; - globalCheckpointListeners.add(NO_OPS_PERFORMED, listener); + globalCheckpointListeners.add(NO_OPS_PERFORMED, listener, null); } globalCheckpointListeners.close(); for (int i = 0; i < numberOfListeners; i++) { @@ -309,7 +334,8 @@ public void testFailingListenerOnClose() throws IOException { assertNull(exceptions[i]); } else { assertNotNull(exceptions[i]); - assertThat(exceptions[i].getShardId(), equalTo(shardId)); + assertThat(exceptions[i], instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException)exceptions[i]).getShardId(), equalTo(shardId)); } } int failureCount = 0; @@ -334,17 +360,20 @@ public void testNotificationUsesExecutor() { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { - globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(globalCheckpoint)); - assertNull(e); - }); + globalCheckpointListeners.add( + NO_OPS_PERFORMED, + (g, e) -> { + notified.incrementAndGet(); + assertThat(g, equalTo(globalCheckpoint)); + assertNull(e); + }, + null); } globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); assertThat(notified.get(), equalTo(numberOfListeners)); @@ -357,17 +386,21 @@ public void testNotificationOnClosedUsesExecutor() throws IOException { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); globalCheckpointListeners.close(); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { - globalCheckpointListeners.add(NO_OPS_PERFORMED, (g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertNotNull(e); - assertThat(e.getShardId(), equalTo(shardId)); - }); + globalCheckpointListeners.add( + NO_OPS_PERFORMED, + (g, e) -> { + notified.incrementAndGet(); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertNotNull(e); + assertThat(e, instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException) e).getShardId(), equalTo(shardId)); + }, + null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -379,17 +412,19 @@ public void testListenersReadyToBeNotifiedUsesExecutor() { count.incrementAndGet(); command.run(); }; - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); final long globalCheckpoint = randomNonNegativeLong(); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint); final AtomicInteger notified = new AtomicInteger(); final int numberOfListeners = randomIntBetween(0, 16); for (int i = 0; i < numberOfListeners; i++) { - globalCheckpointListeners.add(randomLongBetween(0, globalCheckpoint), (g, e) -> { - notified.incrementAndGet(); - assertThat(g, equalTo(globalCheckpoint)); - assertNull(e); - }); + globalCheckpointListeners.add( + randomLongBetween(0, globalCheckpoint), + (g, e) -> { + notified.incrementAndGet(); + assertThat(g, equalTo(globalCheckpoint)); + assertNull(e); + }, null); } assertThat(notified.get(), equalTo(numberOfListeners)); assertThat(count.get(), equalTo(numberOfListeners)); @@ -397,18 +432,18 @@ public void testListenersReadyToBeNotifiedUsesExecutor() { public void testConcurrency() throws BrokenBarrierException, InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, 8)); - final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, logger); + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.get()); // we are going to synchronize the actions of three threads: the updating thread, the listener thread, and the main test thread final CyclicBarrier barrier = new CyclicBarrier(3); - final int numberOfIterations = randomIntBetween(1, 1024); + final int numberOfIterations = randomIntBetween(1, 4096); final AtomicBoolean closed = new AtomicBoolean(); final Thread updatingThread = new Thread(() -> { // synchronize starting with the listener thread and the main test thread awaitQuietly(barrier); for (int i = 0; i < numberOfIterations; i++) { - if (rarely() && closed.get() == false) { + if (i > numberOfIterations / 2 && rarely() && closed.get() == false) { closed.set(true); try { globalCheckpointListeners.close(); @@ -416,7 +451,7 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio throw new UncheckedIOException(e); } } - if (closed.get() == false) { + if (rarely() && closed.get() == false) { globalCheckpointListeners.globalCheckpointUpdated(globalCheckpoint.incrementAndGet()); } } @@ -438,7 +473,8 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio if (invocation.compareAndSet(false, true) == false) { throw new IllegalStateException("listener invoked twice"); } - }); + }, + randomBoolean() ? null : TimeValue.timeValueNanos(randomLongBetween(1, TimeUnit.MICROSECONDS.toNanos(1)))); } // synchronize ending with the updating thread and the main test thread awaitQuietly(barrier); @@ -463,6 +499,107 @@ public void testConcurrency() throws BrokenBarrierException, InterruptedExceptio listenersThread.join(); } + public void testTimeout() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); + final AtomicBoolean notified = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + globalCheckpointListeners.add( + NO_OPS_PERFORMED, + (g, e) -> { + try { + notified.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e, hasToString(containsString(timeout.getStringRep()))); + final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(TimeoutException.class); + verify(mockLogger).trace(message.capture(), t.capture()); + assertThat(message.getValue(), equalTo("global checkpoint listener timed out")); + assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep()))); + } catch (Exception caught) { + fail(e.getMessage()); + } finally { + latch.countDown(); + } + }, + timeout); + latch.await(); + + assertTrue(notified.get()); + } + + public void testTimeoutNotificationUsesExecutor() throws InterruptedException { + final AtomicInteger count = new AtomicInteger(); + final Executor executor = command -> { + count.incrementAndGet(); + command.run(); + }; + final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners(shardId, executor, scheduler, logger); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); + final AtomicBoolean notified = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + globalCheckpointListeners.add( + NO_OPS_PERFORMED, + (g, e) -> { + try { + notified.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(e, instanceOf(TimeoutException.class)); + } finally { + latch.countDown(); + } + }, + timeout); + latch.await(); + // ensure the listener notification occurred on the executor + assertTrue(notified.get()); + assertThat(count.get(), equalTo(1)); + } + + public void testFailingListenerAfterTimeout() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, mockLogger); + final CountDownLatch latch = new CountDownLatch(1); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); + globalCheckpointListeners.add( + NO_OPS_PERFORMED, + (g, e) -> { + try { + throw new RuntimeException("failure"); + } finally { + latch.countDown(); + } + }, + timeout); + latch.await(); + final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor t = ArgumentCaptor.forClass(RuntimeException.class); + verify(mockLogger).warn(message.capture(), t.capture()); + assertThat(message.getValue(), equalTo("error notifying global checkpoint listener of timeout")); + assertNotNull(t.getValue()); + assertThat(t.getValue(), instanceOf(RuntimeException.class)); + assertThat(t.getValue().getMessage(), equalTo("failure")); + } + + public void testTimeoutCancelledAfterListenerNotified() { + final GlobalCheckpointListeners globalCheckpointListeners = + new GlobalCheckpointListeners(shardId, Runnable::run, scheduler, logger); + final TimeValue timeout = TimeValue.timeValueNanos(Long.MAX_VALUE); + final GlobalCheckpointListeners.GlobalCheckpointListener globalCheckpointListener = (g, e) -> { + assertThat(g, equalTo(NO_OPS_PERFORMED)); + assertNull(e); + }; + globalCheckpointListeners.add(NO_OPS_PERFORMED, globalCheckpointListener, timeout); + final ScheduledFuture future = globalCheckpointListeners.getTimeoutFuture(globalCheckpointListener); + assertNotNull(future); + globalCheckpointListeners.globalCheckpointUpdated(NO_OPS_PERFORMED); + assertTrue(future.isCancelled()); + } + private void awaitQuietly(final CyclicBarrier barrier) { try { barrier.await(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 87edfcfccb150..8fe1daefe6d48 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -89,6 +89,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -113,6 +114,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; public class IndexShardIT extends ESSingleNodeTestCase { @@ -746,10 +749,11 @@ public void testGlobalCheckpointListeners() throws Exception { shard.addGlobalCheckpointListener( i - 1, (g, e) -> { - assert g >= NO_OPS_PERFORMED; - assert e == null; + assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); + assertNull(e); globalCheckpoint.set(g); - }); + }, + null); client().prepareIndex("test", "_doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(globalCheckpoint.get(), equalTo((long) index))); // adding a listener expecting a lower global checkpoint should fire immediately @@ -757,10 +761,11 @@ public void testGlobalCheckpointListeners() throws Exception { shard.addGlobalCheckpointListener( randomLongBetween(NO_OPS_PERFORMED, i - 1), (g, e) -> { - assert g >= NO_OPS_PERFORMED; - assert e == null; + assertThat(g, greaterThanOrEqualTo(NO_OPS_PERFORMED)); + assertNull(e); immediateGlobalCheckpint.set(g); - }); + }, + null); assertBusy(() -> assertThat(immediateGlobalCheckpint.get(), equalTo((long) index))); } final AtomicBoolean invoked = new AtomicBoolean(); @@ -768,12 +773,40 @@ public void testGlobalCheckpointListeners() throws Exception { numberOfUpdates - 1, (g, e) -> { invoked.set(true); - assert g == UNASSIGNED_SEQ_NO; - assert e != null; - assertThat(e.getShardId(), equalTo(shard.shardId())); - }); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertThat(e, instanceOf(IndexShardClosedException.class)); + assertThat(((IndexShardClosedException)e).getShardId(), equalTo(shard.shardId())); + }, + null); shard.close("closed", randomBoolean()); assertBusy(() -> assertTrue(invoked.get())); } + public void testGlobalCheckpointListenerTimeout() throws InterruptedException { + createIndex("test", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen(); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + final IndexService test = indicesService.indexService(resolveIndex("test")); + final IndexShard shard = test.getShardOrNull(0); + final AtomicBoolean notified = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(1, 50)); + shard.addGlobalCheckpointListener( + NO_OPS_PERFORMED, + (g, e) -> { + try { + notified.set(true); + assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); + assertNotNull(e); + assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e.getMessage(), equalTo(timeout.getStringRep())); + } finally { + latch.countDown(); + } + }, + timeout); + latch.await(); + assertTrue(notified.get()); + } + }