Skip to content

Commit 28df3d9

Browse files
Force Refresh Listeners when Acquiring all Operation Permits (#37025)
* Force Refresh Listeners when Acquiring all Operation Permits (#36835) * Fixes the issue reproduced in the added tests: * When having open index requests on a shard that are waiting for a refresh, relocating that shard becomes blocked until that refresh happens (which could be never as in the test scenario).
1 parent 8ab9532 commit 28df3d9

File tree

5 files changed

+193
-6
lines changed

5 files changed

+193
-6
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
635635
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
636636
throws IllegalIndexShardStateException, InterruptedException {
637637
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
638+
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
638639
try {
639640
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
641+
forceRefreshes.close();
640642
// no shard operation permits are being held here, move state from started to relocated
641643
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
642644
"in-flight operations in progress while moving shard state to relocated";
@@ -667,6 +669,8 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
667669
// Fail primary relocation source and target shards.
668670
failShard("timed out waiting for relocation hand-off to complete", null);
669671
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
672+
} finally {
673+
forceRefreshes.close();
670674
}
671675
}
672676

@@ -2360,7 +2364,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
23602364
verifyNotClosed();
23612365
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
23622366

2363-
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2367+
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2368+
}
2369+
2370+
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
2371+
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
2372+
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
2373+
forceRefreshes.close();
2374+
onPermitAcquired.onResponse(r);
2375+
}, e -> {
2376+
forceRefreshes.close();
2377+
onPermitAcquired.onFailure(e);
2378+
});
2379+
try {
2380+
indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit);
2381+
} catch (Exception e) {
2382+
forceRefreshes.close();
2383+
throw e;
2384+
}
23642385
}
23652386

23662387
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
@@ -2370,7 +2391,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
23702391
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
23712392
assert operationPrimaryTerm <= pendingPrimaryTerm;
23722393
final CountDownLatch termUpdated = new CountDownLatch(1);
2373-
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
2394+
asyncBlockOperations(new ActionListener<Releasable>() {
23742395
@Override
23752396
public void onFailure(final Exception e) {
23762397
try {
@@ -2463,8 +2484,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
24632484
final long maxSeqNoOfUpdatesOrDeletes,
24642485
final ActionListener<Releasable> onPermitAcquired,
24652486
final TimeValue timeout) {
2466-
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
2467-
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
2487+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
2488+
onPermitAcquired, true,
2489+
listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())
2490+
);
24682491
}
24692492

24702493
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) thr
299299
/**
300300
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
301301
*
302-
* @return the active operation count, or zero when all permits ar eheld
302+
* @return the active operation count, or zero when all permits are held
303303
*/
304304
int getActiveOperationsCount() {
305305
int availablePermits = semaphore.availablePermits();

server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.search.ReferenceManager;
2424
import org.elasticsearch.common.collect.Tuple;
25+
import org.elasticsearch.common.lease.Releasable;
26+
import org.elasticsearch.common.util.concurrent.RunOnce;
2527
import org.elasticsearch.common.util.concurrent.ThreadContext;
2628
import org.elasticsearch.index.translog.Translog;
2729

@@ -53,6 +55,13 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
5355
* Is this closed? If true then we won't add more listeners and have flushed all pending listeners.
5456
*/
5557
private volatile boolean closed = false;
58+
59+
/**
60+
* Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
61+
* refresh during relocation.
62+
*/
63+
private int refreshForcers;
64+
5665
/**
5766
* List of refresh listeners. Defaults to null and built on demand because most refresh cycles won't need it. Entries are never removed
5867
* from it, rather, it is nulled and rebuilt when needed again. The (hopefully) rare entries that didn't make the current refresh cycle
@@ -75,6 +84,32 @@ public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefres
7584
this.threadContext = threadContext;
7685
}
7786

87+
/**
88+
* Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}.
89+
*/
90+
public Releasable forceRefreshes() {
91+
synchronized (this) {
92+
assert refreshForcers >= 0;
93+
refreshForcers += 1;
94+
}
95+
final RunOnce runOnce = new RunOnce(() -> {
96+
synchronized (RefreshListeners.this) {
97+
assert refreshForcers > 0;
98+
refreshForcers -= 1;
99+
}
100+
});
101+
if (refreshNeeded()) {
102+
try {
103+
forceRefresh.run();
104+
} catch (Exception e) {
105+
runOnce.run();
106+
throw e;
107+
}
108+
}
109+
assert refreshListeners == null;
110+
return () -> runOnce.run();
111+
}
112+
78113
/**
79114
* Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it
80115
* forces a refresh and calls the listener immediately as well.
@@ -102,7 +137,7 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
102137
listeners = new ArrayList<>();
103138
refreshListeners = listeners;
104139
}
105-
if (listeners.size() < getMaxRefreshListeners.getAsInt()) {
140+
if (refreshForcers == 0 && listeners.size() < getMaxRefreshListeners.getAsInt()) {
106141
ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true);
107142
Consumer<Boolean> contextPreservingListener = forced -> {
108143
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.UUIDs;
3131
import org.elasticsearch.common.bytes.BytesArray;
3232
import org.elasticsearch.common.bytes.BytesReference;
33+
import org.elasticsearch.common.lease.Releasable;
3334
import org.elasticsearch.common.lucene.uid.Versions;
3435
import org.elasticsearch.common.settings.Settings;
3536
import org.elasticsearch.common.unit.TimeValue;
@@ -342,6 +343,40 @@ public void testLotsOfThreads() throws Exception {
342343
refresher.cancel();
343344
}
344345

346+
public void testDisallowAddListeners() throws Exception {
347+
assertEquals(0, listeners.pendingCount());
348+
DummyRefreshListener listener = new DummyRefreshListener();
349+
assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
350+
engine.refresh("I said so");
351+
assertFalse(listener.forcedRefresh.get());
352+
listener.assertNoError();
353+
354+
try (Releasable releaseable1 = listeners.forceRefreshes()) {
355+
listener = new DummyRefreshListener();
356+
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
357+
assertTrue(listener.forcedRefresh.get());
358+
listener.assertNoError();
359+
assertEquals(0, listeners.pendingCount());
360+
361+
try (Releasable releaseable2 = listeners.forceRefreshes()) {
362+
listener = new DummyRefreshListener();
363+
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
364+
assertTrue(listener.forcedRefresh.get());
365+
listener.assertNoError();
366+
assertEquals(0, listeners.pendingCount());
367+
}
368+
369+
listener = new DummyRefreshListener();
370+
assertTrue(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
371+
assertTrue(listener.forcedRefresh.get());
372+
listener.assertNoError();
373+
assertEquals(0, listeners.pendingCount());
374+
}
375+
376+
assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), new DummyRefreshListener()));
377+
assertEquals(1, listeners.pendingCount());
378+
}
379+
345380
private Engine.IndexResult index(String id) throws IOException {
346381
return index(id, "test");
347382
}

server/src/test/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
import com.carrotsearch.hppc.procedures.IntProcedure;
2424
import org.apache.lucene.index.IndexFileNames;
2525
import org.apache.lucene.util.English;
26+
import org.elasticsearch.action.ActionFuture;
2627
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
28+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
2729
import org.elasticsearch.action.index.IndexRequestBuilder;
2830
import org.elasticsearch.action.search.SearchResponse;
31+
import org.elasticsearch.action.support.WriteRequest;
2932
import org.elasticsearch.client.Client;
3033
import org.elasticsearch.cluster.ClusterState;
3134
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -487,6 +490,97 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr
487490

488491
}
489492

493+
public void testRelocateWhileWaitingForRefresh() {
494+
logger.info("--> starting [node1] ...");
495+
final String node1 = internalCluster().startNode();
496+
497+
logger.info("--> creating test index ...");
498+
prepareCreate("test", Settings.builder()
499+
.put("index.number_of_shards", 1)
500+
.put("index.number_of_replicas", 0)
501+
.put("index.refresh_interval", -1) // we want to control refreshes
502+
).get();
503+
504+
logger.info("--> index 10 docs");
505+
for (int i = 0; i < 10; i++) {
506+
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
507+
}
508+
logger.info("--> flush so we have an actual index");
509+
client().admin().indices().prepareFlush().execute().actionGet();
510+
logger.info("--> index more docs so we have something in the translog");
511+
for (int i = 10; i < 20; i++) {
512+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
513+
.setSource("field", "value" + i).execute();
514+
}
515+
516+
logger.info("--> start another node");
517+
final String node2 = internalCluster().startNode();
518+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
519+
.setWaitForNodes("2").execute().actionGet();
520+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
521+
522+
logger.info("--> relocate the shard from node1 to node2");
523+
client().admin().cluster().prepareReroute()
524+
.add(new MoveAllocationCommand("test", 0, node1, node2))
525+
.execute().actionGet();
526+
527+
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
528+
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
529+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
530+
531+
logger.info("--> verifying count");
532+
client().admin().indices().prepareRefresh().execute().actionGet();
533+
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits(), equalTo(20L));
534+
}
535+
536+
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
537+
logger.info("--> starting [node1] ...");
538+
final String node1 = internalCluster().startNode();
539+
540+
logger.info("--> creating test index ...");
541+
prepareCreate("test", Settings.builder()
542+
.put("index.number_of_shards", 1)
543+
.put("index.number_of_replicas", 0)
544+
.put("index.refresh_interval", -1) // we want to control refreshes
545+
).get();
546+
547+
logger.info("--> index 10 docs");
548+
for (int i = 0; i < 10; i++) {
549+
client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
550+
}
551+
logger.info("--> flush so we have an actual index");
552+
client().admin().indices().prepareFlush().execute().actionGet();
553+
logger.info("--> index more docs so we have something in the translog");
554+
for (int i = 10; i < 20; i++) {
555+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
556+
.setSource("field", "value" + i).execute();
557+
}
558+
559+
logger.info("--> start another node");
560+
final String node2 = internalCluster().startNode();
561+
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
562+
.setWaitForNodes("2").execute().actionGet();
563+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
564+
565+
logger.info("--> relocate the shard from node1 to node2");
566+
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin().cluster().prepareReroute()
567+
.add(new MoveAllocationCommand("test", 0, node1, node2))
568+
.execute();
569+
logger.info("--> index 100 docs while relocating");
570+
for (int i = 20; i < 120; i++) {
571+
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
572+
.setSource("field", "value" + i).execute();
573+
}
574+
relocationListener.actionGet();
575+
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
576+
.setWaitForNoRelocatingShards(true).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet();
577+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
578+
579+
logger.info("--> verifying count");
580+
client().admin().indices().prepareRefresh().execute().actionGet();
581+
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits(), equalTo(120L));
582+
}
583+
490584
class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {
491585

492586
private final CountDownLatch corruptionCount;

0 commit comments

Comments
 (0)