Skip to content

Commit f1c5031

Browse files
committed
Fix queuing in AsyncLucenePersistedState (#50958)
The logic in AsyncLucenePersistedState was flawed, unexpectedly queuing up two update tasks in parallel.
1 parent 91d7b44 commit f1c5031

File tree

1 file changed

+42
-39
lines changed

1 file changed

+42
-39
lines changed

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

+42-39
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.common.settings.Settings;
4747
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4848
import org.elasticsearch.common.util.concurrent.EsExecutors;
49-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
5049
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
5150
import org.elasticsearch.core.internal.io.IOUtils;
5251
import org.elasticsearch.discovery.DiscoveryModule;
@@ -356,7 +355,9 @@ public void setCurrentTerm(long currentTerm) {
356355
} else {
357356
logger.trace("queuing term update (setting term to {})", currentTerm);
358357
newCurrentTermQueued = true;
359-
scheduleUpdate();
358+
if (newStateQueued == false) {
359+
scheduleUpdate();
360+
}
360361
}
361362
}
362363
}
@@ -370,55 +371,57 @@ public void setLastAcceptedState(ClusterState clusterState) {
370371
} else {
371372
logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
372373
newStateQueued = true;
373-
scheduleUpdate();
374+
if (newCurrentTermQueued == false) {
375+
scheduleUpdate();
376+
}
374377
}
375378
}
376379
}
377380

378381
private void scheduleUpdate() {
379382
assert Thread.holdsLock(mutex);
380-
try {
381-
threadPoolExecutor.execute(new AbstractRunnable() {
383+
assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty";
384+
threadPoolExecutor.execute(new AbstractRunnable() {
382385

383-
@Override
384-
public void onFailure(Exception e) {
385-
logger.error("Exception occurred when storing new meta data", e);
386-
}
386+
@Override
387+
public void onFailure(Exception e) {
388+
logger.error("Exception occurred when storing new meta data", e);
389+
}
387390

388-
@Override
389-
protected void doRun() {
390-
final Long term;
391-
final ClusterState clusterState;
392-
synchronized (mutex) {
393-
if (newCurrentTermQueued) {
394-
term = getCurrentTerm();
395-
newCurrentTermQueued = false;
396-
} else {
397-
term = null;
398-
}
399-
if (newStateQueued) {
400-
clusterState = getLastAcceptedState();
401-
newStateQueued = false;
402-
} else {
403-
clusterState = null;
404-
}
405-
}
406-
// write current term before last accepted state so that it is never below term in last accepted state
407-
if (term != null) {
408-
persistedState.setCurrentTerm(term);
391+
@Override
392+
public void onRejection(Exception e) {
393+
assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down";
394+
}
395+
396+
@Override
397+
protected void doRun() {
398+
final Long term;
399+
final ClusterState clusterState;
400+
synchronized (mutex) {
401+
if (newCurrentTermQueued) {
402+
term = getCurrentTerm();
403+
logger.trace("resetting newCurrentTermQueued");
404+
newCurrentTermQueued = false;
405+
} else {
406+
term = null;
409407
}
410-
if (clusterState != null) {
411-
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
408+
if (newStateQueued) {
409+
clusterState = getLastAcceptedState();
410+
logger.trace("resetting newStateQueued");
411+
newStateQueued = false;
412+
} else {
413+
clusterState = null;
412414
}
413415
}
414-
});
415-
} catch (EsRejectedExecutionException e) {
416-
// ignore cases where we are shutting down..., there is really nothing interesting to be done here...
417-
if (threadPoolExecutor.isShutdown() == false) {
418-
assert false : "only expect rejections when shutting down";
419-
throw e;
416+
// write current term before last accepted state so that it is never below term in last accepted state
417+
if (term != null) {
418+
persistedState.setCurrentTerm(term);
419+
}
420+
if (clusterState != null) {
421+
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
422+
}
420423
}
421-
}
424+
});
422425
}
423426

424427
static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =

0 commit comments

Comments
 (0)