Skip to content

Commit f802515

Browse files
committed
addRefreshListener
1 parent bf72bfd commit f802515

File tree

1 file changed

+21
-18
lines changed

1 file changed

+21
-18
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
213213

214214
protected volatile ShardRouting shardRouting;
215215
protected volatile IndexShardState state;
216+
// ensure happens-before relation between addRefreshListener() and postRecovery()
217+
private final Object postRecoveryMutex = new Object();
216218
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
217-
private final Object engineMutex = new Object();
219+
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
218220
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
219221
final EngineFactory engineFactory;
220222

@@ -1338,23 +1340,24 @@ public void close(String reason, boolean flushEngine) throws IOException {
13381340
}
13391341
}
13401342

1341-
public IndexShard postRecovery(String reason)
1342-
throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
1343-
synchronized (mutex) {
1344-
if (state == IndexShardState.CLOSED) {
1345-
throw new IndexShardClosedException(shardId);
1346-
}
1347-
if (state == IndexShardState.STARTED) {
1348-
throw new IndexShardStartedException(shardId);
1343+
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
1344+
synchronized (postRecoveryMutex) {
1345+
// we need to refresh again to expose all operations that were index until now. Otherwise
1346+
// we may not expose operations that were indexed with a refresh listener that was immediately
1347+
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
1348+
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
1349+
getEngine().refresh("post_recovery");
1350+
synchronized (mutex) {
1351+
if (state == IndexShardState.CLOSED) {
1352+
throw new IndexShardClosedException(shardId);
1353+
}
1354+
if (state == IndexShardState.STARTED) {
1355+
throw new IndexShardStartedException(shardId);
1356+
}
1357+
recoveryState.setStage(RecoveryState.Stage.DONE);
1358+
changeState(IndexShardState.POST_RECOVERY, reason);
13491359
}
1350-
recoveryState.setStage(RecoveryState.Stage.DONE);
1351-
changeState(IndexShardState.POST_RECOVERY, reason);
13521360
}
1353-
// we need to refresh again to expose all operations that were index until now. Otherwise
1354-
// we may not expose operations that were indexed with a refresh listener that was immediately
1355-
// responded to in addRefreshListener.
1356-
getEngine().refresh("post_recovery");
1357-
return this;
13581361
}
13591362

13601363
/**
@@ -3257,10 +3260,10 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
32573260
if (isReadAllowed()) {
32583261
readAllowed = true;
32593262
} else {
3260-
// check again under mutex. this is important to create a happens before relationship
3263+
// check again under postRecoveryMutex. this is important to create a happens before relationship
32613264
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
32623265
// to a listener before a refresh actually happened that contained that operation.
3263-
synchronized (mutex) {
3266+
synchronized (postRecoveryMutex) {
32643267
readAllowed = isReadAllowed();
32653268
}
32663269
}

0 commit comments

Comments
 (0)