Skip to content

Commit ddbc468

Browse files
authored
Introduce clean transition on primary promotion
This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed. Relates #24925
1 parent 15fc712 commit ddbc468

File tree

4 files changed

+600
-63
lines changed

4 files changed

+600
-63
lines changed

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
import java.util.Objects;
132132
import java.util.Set;
133133
import java.util.concurrent.CopyOnWriteArrayList;
134+
import java.util.concurrent.CountDownLatch;
134135
import java.util.concurrent.TimeUnit;
135136
import java.util.concurrent.TimeoutException;
136137
import java.util.concurrent.atomic.AtomicBoolean;
@@ -332,12 +333,14 @@ public long getPrimaryTerm() {
332333
}
333334

334335
/**
335-
* notifies the shard of an increase in the primary term
336+
* Notifies the shard of an increase in the primary term.
337+
*
338+
* @param newPrimaryTerm the new primary term
336339
*/
337-
public void updatePrimaryTerm(final long newTerm) {
340+
public void updatePrimaryTerm(final long newPrimaryTerm) {
338341
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
339342
synchronized (mutex) {
340-
if (newTerm != primaryTerm) {
343+
if (newPrimaryTerm != primaryTerm) {
341344
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
342345
// in one state causing it's term to be incremented. Note that if both current shard state and new
343346
// shard state are initializing, we could replace the current shard and reinitialize it. It is however
@@ -354,10 +357,22 @@ public void updatePrimaryTerm(final long newTerm) {
354357
"a started primary shard should never update its term; "
355358
+ "shard " + shardRouting + ", "
356359
+ "current term [" + primaryTerm + "], "
357-
+ "new term [" + newTerm + "]";
358-
assert newTerm > primaryTerm :
359-
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
360-
primaryTerm = newTerm;
360+
+ "new term [" + newPrimaryTerm + "]";
361+
assert newPrimaryTerm > primaryTerm :
362+
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
363+
/*
364+
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
365+
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
366+
* incremented.
367+
*/
368+
final CountDownLatch latch = new CountDownLatch(1);
369+
indexShardOperationPermits.asyncBlockOperations(
370+
30,
371+
TimeUnit.MINUTES,
372+
latch::await,
373+
e -> failShard("exception during primary term transition", e));
374+
primaryTerm = newPrimaryTerm;
375+
latch.countDown();
361376
}
362377
}
363378
}

core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 152 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
package org.elasticsearch.index.shard;
2121

2222
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.Assertions;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.support.ContextPreservingActionListener;
2526
import org.elasticsearch.action.support.ThreadedActionListener;
2627
import org.elasticsearch.common.CheckedRunnable;
2728
import org.elasticsearch.common.Nullable;
2829
import org.elasticsearch.common.lease.Releasable;
30+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2931
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
3032
import org.elasticsearch.threadpool.ThreadPool;
3133

@@ -36,20 +38,35 @@
3638
import java.util.concurrent.TimeUnit;
3739
import java.util.concurrent.TimeoutException;
3840
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.function.Consumer;
3942
import java.util.function.Supplier;
4043

44+
/**
45+
* Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition
46+
* between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain
47+
* and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has
48+
* completed.
49+
*/
4150
final class IndexShardOperationPermits implements Closeable {
51+
4252
private final ShardId shardId;
4353
private final Logger logger;
4454
private final ThreadPool threadPool;
4555

4656
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
47-
// fair semaphore to ensure that blockOperations() does not starve under thread contention
48-
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
49-
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
57+
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
58+
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
5059
private volatile boolean closed;
60+
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
5161

52-
IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
62+
/**
63+
* Construct operation permits for the specified shards.
64+
*
65+
* @param shardId the shard
66+
* @param logger the logger for the shard
67+
* @param threadPool the thread pool (used to execute delayed operations)
68+
*/
69+
IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) {
5370
this.shardId = shardId;
5471
this.logger = logger;
5572
this.threadPool = threadPool;
@@ -61,99 +78,170 @@ public void close() {
6178
}
6279

6380
/**
64-
* Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues
65-
* operations that are occurring in the meanwhile and runs them once onBlocked has executed.
81+
* Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
82+
* operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
6683
*
67-
* @param timeout the maximum time to wait for the in-flight operations block
68-
* @param timeUnit the time unit of the {@code timeout} argument
84+
* @param timeout the maximum time to wait for the in-flight operations block
85+
* @param timeUnit the time unit of the {@code timeout} argument
6986
* @param onBlocked the action to run once the block has been acquired
70-
* @throws InterruptedException if calling thread is interrupted
71-
* @throws TimeoutException if timed out waiting for in-flight operations to finish
87+
* @param <E> the type of checked exception thrown by {@code onBlocked}
88+
* @throws InterruptedException if calling thread is interrupted
89+
* @throws TimeoutException if timed out waiting for in-flight operations to finish
7290
* @throws IndexShardClosedException if operation permit has been closed
7391
*/
74-
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
75-
InterruptedException, TimeoutException, E {
92+
<E extends Exception> void blockOperations(
93+
final long timeout,
94+
final TimeUnit timeUnit,
95+
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
7696
if (closed) {
7797
throw new IndexShardClosedException(shardId);
7898
}
99+
delayOperations();
79100
try {
80-
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
81-
assert semaphore.availablePermits() == 0;
82-
try {
83-
onBlocked.run();
84-
} finally {
85-
semaphore.release(TOTAL_PERMITS);
86-
}
101+
doBlockOperations(timeout, timeUnit, onBlocked);
102+
} finally {
103+
releaseDelayedOperations();
104+
}
105+
}
106+
107+
/**
108+
* Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked}
109+
* under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After
110+
* operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking
111+
* operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked.
112+
*
113+
* @param timeout the maximum time to wait for the in-flight operations block
114+
* @param timeUnit the time unit of the {@code timeout} argument
115+
* @param onBlocked the action to run once the block has been acquired
116+
* @param onFailure the action to run if a failure occurs while blocking operations
117+
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
118+
*/
119+
<E extends Exception> void asyncBlockOperations(
120+
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
121+
delayOperations();
122+
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
123+
@Override
124+
public void onFailure(final Exception e) {
125+
onFailure.accept(e);
126+
}
127+
128+
@Override
129+
protected void doRun() throws Exception {
130+
doBlockOperations(timeout, timeUnit, onBlocked);
131+
}
132+
133+
@Override
134+
public void onAfter() {
135+
releaseDelayedOperations();
136+
}
137+
});
138+
}
139+
140+
private void delayOperations() {
141+
synchronized (this) {
142+
if (delayed) {
143+
throw new IllegalStateException("operations are already delayed");
87144
} else {
88-
throw new TimeoutException("timed out during blockOperations");
145+
assert delayedOperations.isEmpty();
146+
delayed = true;
89147
}
90-
} finally {
91-
final List<ActionListener<Releasable>> queuedActions;
148+
}
149+
}
150+
151+
private <E extends Exception> void doBlockOperations(
152+
final long timeout,
153+
final TimeUnit timeUnit,
154+
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
155+
if (Assertions.ENABLED) {
156+
// since delayed is not volatile, we have to synchronize even here for visibility
92157
synchronized (this) {
93-
queuedActions = delayedOperations;
94-
delayedOperations = null;
158+
assert delayed;
95159
}
96-
if (queuedActions != null) {
97-
// Try acquiring permits on fresh thread (for two reasons):
98-
// - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
99-
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
100-
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
101-
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
102-
// handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
103-
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
104-
for (ActionListener<Releasable> queuedAction : queuedActions) {
105-
acquire(queuedAction, null, false);
106-
}
107-
});
160+
}
161+
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
162+
assert semaphore.availablePermits() == 0;
163+
try {
164+
onBlocked.run();
165+
} finally {
166+
semaphore.release(TOTAL_PERMITS);
108167
}
168+
} else {
169+
throw new TimeoutException("timeout while blocking operations");
170+
}
171+
}
172+
173+
private void releaseDelayedOperations() {
174+
final List<ActionListener<Releasable>> queuedActions;
175+
synchronized (this) {
176+
assert delayed;
177+
queuedActions = new ArrayList<>(delayedOperations);
178+
delayedOperations.clear();
179+
delayed = false;
180+
}
181+
if (!queuedActions.isEmpty()) {
182+
/*
183+
* Try acquiring permits on fresh thread (for two reasons):
184+
* - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled;
185+
* interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by
186+
* the threaded action listener if the queue of the thread pool on which it submits is full
187+
* - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the
188+
* onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the
189+
* recovery
190+
*/
191+
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
192+
for (ActionListener<Releasable> queuedAction : queuedActions) {
193+
acquire(queuedAction, null, false);
194+
}
195+
});
109196
}
110197
}
111198

112199
/**
113200
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
114201
* {@link ActionListener} will be called on the calling thread. During calls of
115-
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will
116-
* then be called using the provided executor once operations are no longer blocked.
202+
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
203+
* will then be called using the provided executor once operations are no longer blocked.
117204
*
118205
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
119206
* @param executorOnDelay executor to use for delayed call
120207
* @param forceExecution whether the runnable should force its execution in case it gets rejected
121208
*/
122-
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
209+
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
123210
if (closed) {
124211
onAcquired.onFailure(new IndexShardClosedException(shardId));
125212
return;
126213
}
127-
Releasable releasable;
214+
final Releasable releasable;
128215
try {
129216
synchronized (this) {
130-
releasable = tryAcquire();
131-
if (releasable == null) {
132-
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
133-
if (delayedOperations == null) {
134-
delayedOperations = new ArrayList<>();
135-
}
217+
if (delayed) {
136218
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
137219
if (executorOnDelay != null) {
138220
delayedOperations.add(
139-
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
140-
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
221+
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
222+
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
141223
} else {
142224
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
143225
}
144226
return;
227+
} else {
228+
releasable = tryAcquire();
229+
assert releasable != null;
145230
}
146231
}
147-
} catch (InterruptedException e) {
232+
} catch (final InterruptedException e) {
148233
onAcquired.onFailure(e);
149234
return;
150235
}
236+
// execute this outside the synchronized block!
151237
onAcquired.onResponse(releasable);
152238
}
153239

154-
@Nullable private Releasable tryAcquire() throws InterruptedException {
155-
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
156-
AtomicBoolean closed = new AtomicBoolean();
240+
@Nullable
241+
private Releasable tryAcquire() throws InterruptedException {
242+
assert Thread.holdsLock(this);
243+
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
244+
final AtomicBoolean closed = new AtomicBoolean();
157245
return () -> {
158246
if (closed.compareAndSet(false, true)) {
159247
semaphore.release(1);
@@ -163,13 +251,23 @@ public void acquire(ActionListener<Releasable> onAcquired, String executorOnDela
163251
return null;
164252
}
165253

166-
public int getActiveOperationsCount() {
254+
/**
255+
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
256+
*
257+
* @return the active operation count, or zero when all permits ar eheld
258+
*/
259+
int getActiveOperationsCount() {
167260
int availablePermits = semaphore.availablePermits();
168261
if (availablePermits == 0) {
169-
// when blockOperations is holding all permits
262+
/*
263+
* This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the
264+
* remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that
265+
* the active operations count is zero.
266+
*/
170267
return 0;
171268
} else {
172269
return TOTAL_PERMITS - availablePermits;
173270
}
174271
}
272+
175273
}

0 commit comments

Comments
 (0)