Skip to content

Commit 310b9e4

Browse files
committed
ThreadPool and ThreadContext are not closeable
This commit changes the ThreadContext to just use a regular ThreadLocal over the lucene CloseableThreadLocal. The CloseableThreadLocal solves issues with ThreadLocals that are no longer needed during runtime but in the case of the ThreadContext, we need it for the runtime of the node and it is typically not closed until the node closes, so we miss out on the benefits that this class provides. Additionally by removing the close logic, we simplify code in other places that deal with exceptions and tracking to see if it happens when the node is closing. Closes elastic#42577
1 parent 7870ae2 commit 310b9e4

File tree

15 files changed

+476
-662
lines changed

15 files changed

+476
-662
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,8 @@ protected void afterExecute(Runnable r, Throwable t) {
106106
}
107107

108108
private boolean assertDefaultContext(Runnable r) {
109-
try {
110-
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
111-
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
112-
} catch (IllegalStateException ex) {
113-
// sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
114-
// this must not trigger an exception here since we only assert if the default is restored and
115-
// we don't really care if we are closed
116-
if (contextHolder.isClosed() == false) {
117-
throw ex;
118-
}
119-
}
109+
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
110+
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
120111
return true;
121112
}
122113

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

+7-99
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.logging.log4j.LogManager;
2222
import org.apache.logging.log4j.Logger;
23-
import org.apache.lucene.util.CloseableThreadLocal;
2423
import org.elasticsearch.action.support.ContextPreservingActionListener;
2524
import org.elasticsearch.client.OriginSettingClient;
2625
import org.elasticsearch.common.io.stream.StreamInput;
@@ -31,7 +30,6 @@
3130
import org.elasticsearch.common.settings.Settings;
3231
import org.elasticsearch.http.HttpTransportSettings;
3332

34-
import java.io.Closeable;
3533
import java.io.IOException;
3634
import java.nio.charset.StandardCharsets;
3735
import java.util.Collections;
@@ -41,7 +39,6 @@
4139
import java.util.List;
4240
import java.util.Map;
4341
import java.util.Set;
44-
import java.util.concurrent.atomic.AtomicBoolean;
4542
import java.util.function.BiConsumer;
4643
import java.util.function.BinaryOperator;
4744
import java.util.function.Function;
@@ -81,7 +78,7 @@
8178
* </pre>
8279
*
8380
*/
84-
public final class ThreadContext implements Closeable, Writeable {
81+
public final class ThreadContext implements Writeable {
8582

8683
public static final String PREFIX = "request.headers";
8784
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
@@ -94,7 +91,7 @@ public final class ThreadContext implements Closeable, Writeable {
9491
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
9592
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
9693
private final Map<String, String> defaultHeader;
97-
private final ContextThreadLocal threadLocal;
94+
private final ThreadLocal<ThreadContextStruct> threadLocal;
9895
private final int maxWarningHeaderCount;
9996
private final long maxWarningHeaderSize;
10097

@@ -113,34 +110,23 @@ public ThreadContext(Settings settings) {
113110
}
114111
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
115112
}
116-
threadLocal = new ContextThreadLocal();
113+
threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
117114
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
118115
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
119116
}
120117

121-
@Override
122-
public void close() {
123-
threadLocal.close();
124-
}
125-
126118
/**
127119
* Removes the current context and resets a default context. The removed context can be
128120
* restored by closing the returned {@link StoredContext}.
129121
*/
130122
public StoredContext stashContext() {
131123
final ThreadContextStruct context = threadLocal.get();
132-
threadLocal.set(null);
124+
threadLocal.set(DEFAULT_CONTEXT);
133125
return () -> {
134126
// If the node and thus the threadLocal get closed while this task
135127
// is still executing, we don't want this runnable to fail with an
136128
// uncaught exception
137-
try {
138-
threadLocal.set(context);
139-
} catch (IllegalStateException e) {
140-
if (isClosed() == false) {
141-
throw e;
142-
}
143-
}
129+
threadLocal.set(context);
144130
};
145131
}
146132

@@ -399,13 +385,6 @@ public boolean isSystemContext() {
399385
return threadLocal.get().isSystemContext;
400386
}
401387

402-
/**
403-
* Returns <code>true</code> if the context is closed, otherwise <code>true</code>
404-
*/
405-
boolean isClosed() {
406-
return threadLocal.closed.get();
407-
}
408-
409388
@FunctionalInterface
410389
public interface StoredContext extends AutoCloseable {
411390
@Override
@@ -617,55 +596,6 @@ private void writeTo(StreamOutput out, Map<String, String> defaultHeaders) throw
617596
}
618597
}
619598

620-
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
621-
private final AtomicBoolean closed = new AtomicBoolean(false);
622-
623-
@Override
624-
public void set(ThreadContextStruct object) {
625-
try {
626-
if (object == DEFAULT_CONTEXT) {
627-
super.set(null);
628-
} else {
629-
super.set(object);
630-
}
631-
} catch (NullPointerException ex) {
632-
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
633-
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
634-
ensureOpen();
635-
throw ex;
636-
}
637-
}
638-
639-
@Override
640-
public ThreadContextStruct get() {
641-
try {
642-
ThreadContextStruct threadContextStruct = super.get();
643-
if (threadContextStruct != null) {
644-
return threadContextStruct;
645-
}
646-
return DEFAULT_CONTEXT;
647-
} catch (NullPointerException ex) {
648-
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
649-
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
650-
ensureOpen();
651-
throw ex;
652-
}
653-
}
654-
655-
private void ensureOpen() {
656-
if (closed.get()) {
657-
throw new IllegalStateException("threadcontext is already closed");
658-
}
659-
}
660-
661-
@Override
662-
public void close() {
663-
if (closed.compareAndSet(false, true)) {
664-
super.close();
665-
}
666-
}
667-
}
668-
669599
/**
670600
* Wraps a Runnable to preserve the thread context.
671601
*/
@@ -680,19 +610,9 @@ private ContextPreservingRunnable(Runnable in) {
680610

681611
@Override
682612
public void run() {
683-
boolean whileRunning = false;
684613
try (ThreadContext.StoredContext ignore = stashContext()){
685614
ctx.restore();
686-
whileRunning = true;
687615
in.run();
688-
whileRunning = false;
689-
} catch (IllegalStateException ex) {
690-
if (whileRunning || threadLocal.closed.get() == false) {
691-
throw ex;
692-
}
693-
// if we hit an ISE here we have been shutting down
694-
// this comes from the threadcontext and barfs if
695-
// our threadpool has been shutting down
696616
}
697617
}
698618

@@ -749,21 +669,9 @@ public void onRejection(Exception e) {
749669

750670
@Override
751671
protected void doRun() throws Exception {
752-
boolean whileRunning = false;
753672
threadsOriginalContext = stashContext();
754-
try {
755-
creatorsContext.restore();
756-
whileRunning = true;
757-
in.doRun();
758-
whileRunning = false;
759-
} catch (IllegalStateException ex) {
760-
if (whileRunning || threadLocal.closed.get() == false) {
761-
throw ex;
762-
}
763-
// if we hit an ISE here we have been shutting down
764-
// this comes from the threadcontext and barfs if
765-
// our threadpool has been shutting down
766-
}
673+
creatorsContext.restore();
674+
in.doRun();
767675
}
768676

769677
@Override

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

+7-15
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.common.xcontent.XContentBuilder;
4040
import org.elasticsearch.node.Node;
4141

42-
import java.io.Closeable;
4342
import java.io.IOException;
4443
import java.util.ArrayList;
4544
import java.util.Arrays;
@@ -62,7 +61,7 @@
6261
import static java.util.Collections.unmodifiableMap;
6362
import static java.util.Map.entry;
6463

65-
public class ThreadPool implements Scheduler, Closeable {
64+
public class ThreadPool implements Scheduler {
6665

6766
private static final Logger logger = LogManager.getLogger(ThreadPool.class);
6867

@@ -704,15 +703,13 @@ private static boolean awaitTermination(
704703
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
705704
if (pool != null) {
706705
// Leverage try-with-resources to close the threadpool
707-
try (ThreadPool c = pool) {
708-
pool.shutdown();
709-
if (awaitTermination(pool, timeout, timeUnit)) {
710-
return true;
711-
}
712-
// last resort
713-
pool.shutdownNow();
714-
return awaitTermination(pool, timeout, timeUnit);
706+
pool.shutdown();
707+
if (awaitTermination(pool, timeout, timeUnit)) {
708+
return true;
715709
}
710+
// last resort
711+
pool.shutdownNow();
712+
return awaitTermination(pool, timeout, timeUnit);
716713
}
717714
return false;
718715
}
@@ -731,11 +728,6 @@ private static boolean awaitTermination(
731728
return false;
732729
}
733730

734-
@Override
735-
public void close() {
736-
threadContext.close();
737-
}
738-
739731
public ThreadContext getThreadContext() {
740732
return threadContext;
741733
}

server/src/main/java/org/elasticsearch/transport/TransportLogger.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,8 @@ private String format(TcpChannel channel, BytesReference message, String event)
9797
streamInput = compressor.streamInput(streamInput);
9898
}
9999

100-
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
101-
context.readHeaders(streamInput);
102-
}
100+
ThreadContext context = new ThreadContext(Settings.EMPTY);
101+
context.readHeaders(streamInput);
103102
// now we decode the features
104103
streamInput.readStringArray();
105104
sb.append(", action: ").append(streamInput.readString());

0 commit comments

Comments
 (0)