Skip to content

Commit 04930e9

Browse files
authored
Notify refresh listeners on the calling thread (#53259)
Today we notify refresh listeners by forking to the listener thread pool and then serially notifying listeners on a thread there. Refreshes are expensive though, so the expectation is that we are executing refreshes on threads that can afford an expensive operation (e.g., not a network thread) and as such, executing listeners that we expect to be cheap aon the calling thread is okay. This commit removes the forking of notifying refresh listeners to run directly on the calling thread that executed a refresh.
1 parent 69cddcb commit 04930e9

File tree

3 files changed

+15
-18
lines changed

3 files changed

+15
-18
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3101,7 +3101,6 @@ private RefreshListeners buildRefreshListeners() {
31013101
return new RefreshListeners(
31023102
indexSettings::getMaxRefreshListeners,
31033103
() -> refresh("too_many_listeners"),
3104-
threadPool.executor(ThreadPool.Names.LISTENER),
31053104
logger, threadPool.getThreadContext(),
31063105
externalRefreshMetric);
31073106
}

server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.List;
35-
import java.util.concurrent.Executor;
3635
import java.util.function.Consumer;
3736
import java.util.function.IntSupplier;
3837
import java.util.function.Supplier;
@@ -48,7 +47,6 @@
4847
public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable {
4948
private final IntSupplier getMaxRefreshListeners;
5049
private final Runnable forceRefresh;
51-
private final Executor listenerExecutor;
5250
private final Logger logger;
5351
private final ThreadContext threadContext;
5452
private final MeanMetric refreshMetric;
@@ -82,11 +80,15 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
8280
*/
8381
private volatile Translog.Location lastRefreshedLocation;
8482

85-
public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefresh, Executor listenerExecutor, Logger logger,
86-
ThreadContext threadContext, MeanMetric refreshMetric) {
83+
public RefreshListeners(
84+
final IntSupplier getMaxRefreshListeners,
85+
final Runnable forceRefresh,
86+
final Logger logger,
87+
final ThreadContext threadContext,
88+
final MeanMetric refreshMetric
89+
) {
8790
this.getMaxRefreshListeners = getMaxRefreshListeners;
8891
this.forceRefresh = forceRefresh;
89-
this.listenerExecutor = listenerExecutor;
9092
this.logger = logger;
9193
this.threadContext = threadContext;
9294
this.refreshMetric = refreshMetric;
@@ -282,24 +284,22 @@ public void afterRefresh(boolean didRefresh) throws IOException {
282284
}
283285
}
284286
}
285-
// Lastly, fire the listeners that are ready on the listener thread pool
287+
// Lastly, fire the listeners that are ready
286288
fireListeners(listenersToFire);
287289
}
288290

289291
/**
290292
* Fire some listeners. Does nothing if the list of listeners is null.
291293
*/
292-
private void fireListeners(List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
294+
private void fireListeners(final List<Tuple<Translog.Location, Consumer<Boolean>>> listenersToFire) {
293295
if (listenersToFire != null) {
294-
listenerExecutor.execute(() -> {
295-
for (Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
296-
try {
297-
listener.v2().accept(false);
298-
} catch (Exception e) {
299-
logger.warn("Error firing refresh listener", e);
300-
}
296+
for (final Tuple<Translog.Location, Consumer<Boolean>> listener : listenersToFire) {
297+
try {
298+
listener.v2().accept(false);
299+
} catch (final Exception e) {
300+
logger.warn("error firing refresh listener", e);
301301
}
302-
});
302+
}
303303
}
304304
}
305305
}

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ public void setupListeners() throws Exception {
102102
listeners = new RefreshListeners(
103103
() -> maxListeners,
104104
() -> engine.refresh("too-many-listeners"),
105-
// Immediately run listeners rather than adding them to the listener thread pool like IndexShard does to simplify the test.
106-
Runnable::run,
107105
logger,
108106
threadPool.getThreadContext(),
109107
refreshMetric);

0 commit comments

Comments
 (0)