Skip to content

Commit b146740

Browse files
authored
Fix queuing in AsyncLucenePersistedState (#50958)
The logic in AsyncLucenePersistedState was flawed, unexpectedly queuing up two update tasks in parallel.
1 parent 5736dfb commit b146740

File tree

1 file changed

+42
-39
lines changed

1 file changed

+42
-39
lines changed

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

+42-39
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.elasticsearch.common.settings.Settings;
4545
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4646
import org.elasticsearch.common.util.concurrent.EsExecutors;
47-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4847
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
4948
import org.elasticsearch.core.internal.io.IOUtils;
5049
import org.elasticsearch.env.NodeMetaData;
@@ -291,7 +290,9 @@ public void setCurrentTerm(long currentTerm) {
291290
} else {
292291
logger.trace("queuing term update (setting term to {})", currentTerm);
293292
newCurrentTermQueued = true;
294-
scheduleUpdate();
293+
if (newStateQueued == false) {
294+
scheduleUpdate();
295+
}
295296
}
296297
}
297298
}
@@ -305,55 +306,57 @@ public void setLastAcceptedState(ClusterState clusterState) {
305306
} else {
306307
logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
307308
newStateQueued = true;
308-
scheduleUpdate();
309+
if (newCurrentTermQueued == false) {
310+
scheduleUpdate();
311+
}
309312
}
310313
}
311314
}
312315

313316
private void scheduleUpdate() {
314317
assert Thread.holdsLock(mutex);
315-
try {
316-
threadPoolExecutor.execute(new AbstractRunnable() {
318+
assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty";
319+
threadPoolExecutor.execute(new AbstractRunnable() {
317320

318-
@Override
319-
public void onFailure(Exception e) {
320-
logger.error("Exception occurred when storing new meta data", e);
321-
}
321+
@Override
322+
public void onFailure(Exception e) {
323+
logger.error("Exception occurred when storing new meta data", e);
324+
}
322325

323-
@Override
324-
protected void doRun() {
325-
final Long term;
326-
final ClusterState clusterState;
327-
synchronized (mutex) {
328-
if (newCurrentTermQueued) {
329-
term = getCurrentTerm();
330-
newCurrentTermQueued = false;
331-
} else {
332-
term = null;
333-
}
334-
if (newStateQueued) {
335-
clusterState = getLastAcceptedState();
336-
newStateQueued = false;
337-
} else {
338-
clusterState = null;
339-
}
340-
}
341-
// write current term before last accepted state so that it is never below term in last accepted state
342-
if (term != null) {
343-
persistedState.setCurrentTerm(term);
326+
@Override
327+
public void onRejection(Exception e) {
328+
assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down";
329+
}
330+
331+
@Override
332+
protected void doRun() {
333+
final Long term;
334+
final ClusterState clusterState;
335+
synchronized (mutex) {
336+
if (newCurrentTermQueued) {
337+
term = getCurrentTerm();
338+
logger.trace("resetting newCurrentTermQueued");
339+
newCurrentTermQueued = false;
340+
} else {
341+
term = null;
344342
}
345-
if (clusterState != null) {
346-
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
343+
if (newStateQueued) {
344+
clusterState = getLastAcceptedState();
345+
logger.trace("resetting newStateQueued");
346+
newStateQueued = false;
347+
} else {
348+
clusterState = null;
347349
}
348350
}
349-
});
350-
} catch (EsRejectedExecutionException e) {
351-
// ignore cases where we are shutting down..., there is really nothing interesting to be done here...
352-
if (threadPoolExecutor.isShutdown() == false) {
353-
assert false : "only expect rejections when shutting down";
354-
throw e;
351+
// write current term before last accepted state so that it is never below term in last accepted state
352+
if (term != null) {
353+
persistedState.setCurrentTerm(term);
354+
}
355+
if (clusterState != null) {
356+
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
357+
}
355358
}
356-
}
359+
});
357360
}
358361

359362
static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =

0 commit comments

Comments
 (0)