diff --git a/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java b/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java index a3f4ecde8135a..f25b30ccec95f 100644 --- a/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java +++ b/qa/logging-config/src/test/java/org/elasticsearch/common/logging/JsonLoggerTests.java @@ -272,80 +272,77 @@ public void testJsonInStacktraceMessageIsSplitted() throws IOException { public void testDuplicateLogMessages() throws IOException { final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("test")); - // For the same key and X-Opaque-ID deprecation should be once - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - try{ - threadContext.putHeader(Task.X_OPAQUE_ID, "ID1"); - DeprecationLogger.setThreadContext(threadContext); - deprecationLogger.deprecatedAndMaybeLog("key", "message1"); - deprecationLogger.deprecatedAndMaybeLog("key", "message2"); - assertWarnings("message1", "message2"); - - final Path path = PathUtils.get(System.getProperty("es.logs.base_path"), - System.getProperty("es.logs.cluster_name") + "_deprecated.json"); - try (Stream> stream = JsonLogsStream.mapStreamFrom(path)) { - List> jsonLogs = stream - .collect(Collectors.toList()); - - assertThat(jsonLogs, contains( - allOf( - hasEntry("type", "deprecation"), - hasEntry("level", "WARN"), - hasEntry("component", "d.test"), - hasEntry("cluster.name", "elasticsearch"), - hasEntry("node.name", "sample-name"), - hasEntry("message", "message1"), - hasEntry("x-opaque-id", "ID1")) - ) - ); - } - }finally{ - DeprecationLogger.removeThreadContext(threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putHeader(Task.X_OPAQUE_ID, "ID1"); + DeprecationLogger.setThreadContext(threadContext); + deprecationLogger.deprecatedAndMaybeLog("key", "message1"); + deprecationLogger.deprecatedAndMaybeLog("key", "message2"); + assertWarnings("message1", "message2"); + + final Path path = PathUtils.get(System.getProperty("es.logs.base_path"), + System.getProperty("es.logs.cluster_name") + "_deprecated.json"); + try (Stream> stream = JsonLogsStream.mapStreamFrom(path)) { + List> jsonLogs = stream + .collect(Collectors.toList()); + + assertThat(jsonLogs, contains( + allOf( + hasEntry("type", "deprecation"), + hasEntry("level", "WARN"), + hasEntry("component", "d.test"), + hasEntry("cluster.name", "elasticsearch"), + hasEntry("node.name", "sample-name"), + hasEntry("message", "message1"), + hasEntry("x-opaque-id", "ID1")) + ) + ); } + } finally { + DeprecationLogger.removeThreadContext(threadContext); } + // For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id //continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2 - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - try{ - threadContext.putHeader(Task.X_OPAQUE_ID, "ID2"); - DeprecationLogger.setThreadContext(threadContext); - deprecationLogger.deprecatedAndMaybeLog("key", "message1"); - deprecationLogger.deprecatedAndMaybeLog("key", "message2"); - assertWarnings("message1", "message2"); - - final Path path = PathUtils.get(System.getProperty("es.logs.base_path"), - System.getProperty("es.logs.cluster_name") + "_deprecated.json"); - try (Stream> stream = JsonLogsStream.mapStreamFrom(path)) { - List> jsonLogs = stream - .collect(Collectors.toList()); - - assertThat(jsonLogs, contains( - allOf( - hasEntry("type", "deprecation"), - hasEntry("level", "WARN"), - hasEntry("component", "d.test"), - hasEntry("cluster.name", "elasticsearch"), - hasEntry("node.name", "sample-name"), - hasEntry("message", "message1"), - hasEntry("x-opaque-id", "ID1") - ), - allOf( - hasEntry("type", "deprecation"), - hasEntry("level", "WARN"), - hasEntry("component", "d.test"), - hasEntry("cluster.name", "elasticsearch"), - hasEntry("node.name", "sample-name"), - hasEntry("message", "message1"), - hasEntry("x-opaque-id", "ID2") - ) - ) - ); - } - }finally{ - DeprecationLogger.removeThreadContext(threadContext); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putHeader(Task.X_OPAQUE_ID, "ID2"); + DeprecationLogger.setThreadContext(threadContext); + deprecationLogger.deprecatedAndMaybeLog("key", "message1"); + deprecationLogger.deprecatedAndMaybeLog("key", "message2"); + assertWarnings("message1", "message2"); + + final Path path = PathUtils.get(System.getProperty("es.logs.base_path"), + System.getProperty("es.logs.cluster_name") + "_deprecated.json"); + try (Stream> stream = JsonLogsStream.mapStreamFrom(path)) { + List> jsonLogs = stream + .collect(Collectors.toList()); + + assertThat(jsonLogs, contains( + allOf( + hasEntry("type", "deprecation"), + hasEntry("level", "WARN"), + hasEntry("component", "d.test"), + hasEntry("cluster.name", "elasticsearch"), + hasEntry("node.name", "sample-name"), + hasEntry("message", "message1"), + hasEntry("x-opaque-id", "ID1") + ), + allOf( + hasEntry("type", "deprecation"), + hasEntry("level", "WARN"), + hasEntry("component", "d.test"), + hasEntry("cluster.name", "elasticsearch"), + hasEntry("node.name", "sample-name"), + hasEntry("message", "message1"), + hasEntry("x-opaque-id", "ID2") + ) + ) + ); } + } finally { + DeprecationLogger.removeThreadContext(threadContext); } } diff --git a/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java b/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java index 31d454e70e3d6..4a7c4944ed47c 100644 --- a/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java +++ b/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java @@ -262,7 +262,6 @@ public Void run() { public String getXOpaqueId(Set threadContexts) { return threadContexts.stream() - .filter(t -> t.isClosed() == false) .filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null) .findFirst() .map(t -> t.getHeader(Task.X_OPAQUE_ID)) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index dafbc8f835117..4a95c34975ae8 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -106,17 +106,8 @@ protected void afterExecute(Runnable r, Throwable t) { } private boolean assertDefaultContext(Runnable r) { - try { - assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + - Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; - } catch (IllegalStateException ex) { - // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks - // this must not trigger an exception here since we only assert if the default is restored and - // we don't really care if we are closed - if (contextHolder.isClosed() == false) { - throw ex; - } - } + assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + + Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; return true; } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 647ef9ff579f5..5912cf792a985 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -20,10 +20,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -33,7 +33,6 @@ import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.tasks.Task; -import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -44,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; @@ -84,7 +82,7 @@ * * */ -public final class ThreadContext implements Closeable, Writeable { +public final class ThreadContext implements Writeable { public static final String PREFIX = "request.headers"; public static final Setting DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope); @@ -97,7 +95,7 @@ public final class ThreadContext implements Closeable, Writeable { private static final Logger logger = LogManager.getLogger(ThreadContext.class); private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); private final Map defaultHeader; - private final ContextThreadLocal threadLocal; + private final ThreadLocal threadLocal; private final int maxWarningHeaderCount; private final long maxWarningHeaderSize; @@ -106,26 +104,12 @@ public final class ThreadContext implements Closeable, Writeable { * @param settings the settings to read the default request headers from */ public ThreadContext(Settings settings) { - Settings headers = DEFAULT_HEADERS_SETTING.get(settings); - if (headers == null) { - this.defaultHeader = Collections.emptyMap(); - } else { - Map defaultHeader = new HashMap<>(); - for (String key : headers.names()) { - defaultHeader.put(key, headers.get(key)); - } - this.defaultHeader = Collections.unmodifiableMap(defaultHeader); - } - threadLocal = new ContextThreadLocal(); + this.defaultHeader = buildDefaultHeaders(settings); + this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); } - @Override - public void close() { - threadLocal.close(); - } - /** * Removes the current context and resets a default context. The removed context can be * restored by closing the returned {@link StoredContext}. @@ -144,19 +128,13 @@ public StoredContext stashContext() { .immutableMap()); threadLocal.set(threadContextStruct); } else { - threadLocal.set(null); + threadLocal.set(DEFAULT_CONTEXT); } return () -> { // If the node and thus the threadLocal get closed while this task // is still executing, we don't want this runnable to fail with an // uncaught exception - try { - threadLocal.set(context); - } catch (IllegalStateException e) { - if (isClosed() == false) { - throw e; - } - } + threadLocal.set(context); }; } @@ -259,7 +237,20 @@ public void writeTo(StreamOutput out) throws IOException { * Reads the headers from the stream into the current context */ public void readHeaders(StreamInput in) throws IOException { - final Map requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString); + final Tuple, Map>> streamTuple = readHeadersFromStream(in); + final Map requestHeaders = streamTuple.v1(); + final Map> responseHeaders = streamTuple.v2(); + final ThreadContextStruct struct; + if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) { + struct = ThreadContextStruct.EMPTY; + } else { + struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false); + } + threadLocal.set(struct); + } + + public static Tuple, Map>> readHeadersFromStream(StreamInput in) throws IOException { + final Map requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString); final Map> responseHeaders = in.readMap(StreamInput::readString, input -> { final int size = input.readVInt(); if (size == 0) { @@ -277,13 +268,7 @@ public void readHeaders(StreamInput in) throws IOException { return values; } }); - final ThreadContextStruct struct; - if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) { - struct = ThreadContextStruct.EMPTY; - } else { - struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false); - } - threadLocal.set(struct); + return new Tuple<>(requestHeaders, responseHeaders); } /** @@ -377,17 +362,7 @@ public void addResponseHeader(final String key, final String value) { * @param uniqueValue the function that produces de-duplication values */ public void addResponseHeader(final String key, final String value, final Function uniqueValue) { - /* - * Updating the thread local is expensive due to a shared reference that we synchronize on, so we should only do it if the thread - * context struct changed. It will not change if we de-duplicate this value to an existing one, or if we don't add a new one because - * we have reached capacity. - */ - final ThreadContextStruct current = threadLocal.get(); - final ThreadContextStruct maybeNext = - current.putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize); - if (current != maybeNext) { - threadLocal.set(maybeNext); - } + threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize)); } /** @@ -439,13 +414,6 @@ public boolean isSystemContext() { return threadLocal.get().isSystemContext; } - /** - * Returns true if the context is closed, otherwise true - */ - public boolean isClosed() { - return threadLocal.closed.get(); - } - @FunctionalInterface public interface StoredContext extends AutoCloseable { @Override @@ -456,6 +424,19 @@ default void restore() { } } + public static Map buildDefaultHeaders(Settings settings) { + Settings headers = DEFAULT_HEADERS_SETTING.get(settings); + if (headers == null) { + return Collections.emptyMap(); + } else { + Map defaultHeader = new HashMap<>(); + for (String key : headers.names()) { + defaultHeader.put(key, headers.get(key)); + } + return Collections.unmodifiableMap(defaultHeader); + } + } + private static final class ThreadContextStruct { private static final ThreadContextStruct EMPTY = @@ -633,55 +614,6 @@ private void writeTo(StreamOutput out, Map defaultHeaders) throw } } - private static class ContextThreadLocal extends CloseableThreadLocal { - private final AtomicBoolean closed = new AtomicBoolean(false); - - @Override - public void set(ThreadContextStruct object) { - try { - if (object == DEFAULT_CONTEXT) { - super.set(null); - } else { - super.set(object); - } - } catch (NullPointerException ex) { - /* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed. - to get a real exception we call ensureOpen() to tell the user we are already closed.*/ - ensureOpen(); - throw ex; - } - } - - @Override - public ThreadContextStruct get() { - try { - ThreadContextStruct threadContextStruct = super.get(); - if (threadContextStruct != null) { - return threadContextStruct; - } - return DEFAULT_CONTEXT; - } catch (NullPointerException ex) { - /* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed. - to get a real exception we call ensureOpen() to tell the user we are already closed.*/ - ensureOpen(); - throw ex; - } - } - - private void ensureOpen() { - if (closed.get()) { - throw new IllegalStateException("threadcontext is already closed"); - } - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - super.close(); - } - } - } - /** * Wraps a Runnable to preserve the thread context. */ @@ -696,19 +628,9 @@ private ContextPreservingRunnable(Runnable in) { @Override public void run() { - boolean whileRunning = false; try (ThreadContext.StoredContext ignore = stashContext()){ ctx.restore(); - whileRunning = true; in.run(); - whileRunning = false; - } catch (IllegalStateException ex) { - if (whileRunning || threadLocal.closed.get() == false) { - throw ex; - } - // if we hit an ISE here we have been shutting down - // this comes from the threadcontext and barfs if - // our threadpool has been shutting down } } @@ -765,21 +687,9 @@ public void onRejection(Exception e) { @Override protected void doRun() throws Exception { - boolean whileRunning = false; threadsOriginalContext = stashContext(); - try { - creatorsContext.restore(); - whileRunning = true; - in.doRun(); - whileRunning = false; - } catch (IllegalStateException ex) { - if (whileRunning || threadLocal.closed.get() == false) { - throw ex; - } - // if we hit an ISE here we have been shutting down - // this comes from the threadcontext and barfs if - // our threadpool has been shutting down - } + creatorsContext.restore(); + in.doRun(); } @Override diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index df3e1da5dce6c..323bbdea6f2c1 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.Node; -import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -60,7 +59,7 @@ import static java.util.Collections.unmodifiableMap; -public class ThreadPool implements Scheduler, Closeable { +public class ThreadPool implements Scheduler { private static final Logger logger = LogManager.getLogger(ThreadPool.class); @@ -754,15 +753,13 @@ private static boolean awaitTermination( public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) { if (pool != null) { // Leverage try-with-resources to close the threadpool - try (ThreadPool c = pool) { - pool.shutdown(); - if (awaitTermination(pool, timeout, timeUnit)) { - return true; - } - // last resort - pool.shutdownNow(); - return awaitTermination(pool, timeout, timeUnit); + pool.shutdown(); + if (awaitTermination(pool, timeout, timeUnit)) { + return true; } + // last resort + pool.shutdownNow(); + return awaitTermination(pool, timeout, timeUnit); } return false; } @@ -781,11 +778,6 @@ private static boolean awaitTermination( return false; } - @Override - public void close() { - threadContext.close(); - } - public ThreadContext getThreadContext() { return threadContext; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java index 32def67544d7b..e795c20e79c9d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportLogger.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportLogger.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.NotCompressedException; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; @@ -93,10 +92,9 @@ private static String format(TcpChannel channel, BytesReference message, String } streamInput = compressor.streamInput(streamInput); } - - try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { - context.readHeaders(streamInput); - } + + // read and discard headers + ThreadContext.readHeadersFromStream(streamInput); // now we decode the features if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) { streamInput.readStringArray(); diff --git a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java index dd627041f2262..f0277c65c16d6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ContextPreservingActionListenerTests.java @@ -28,129 +28,126 @@ public class ContextPreservingActionListenerTests extends ESTestCase { public void testOriginalContextIsPreservedAfterOnResponse() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final boolean nonEmptyContext = randomBoolean(); - if (nonEmptyContext) { - threadContext.putHeader("not empty", "value"); - } - final ContextPreservingActionListener actionListener; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - final ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Void aVoid) { - assertEquals("bar", threadContext.getHeader("foo")); - assertNull(threadContext.getHeader("not empty")); - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException("onFailure shouldn't be called", e); - } - }; - if (randomBoolean()) { - actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); - } else { - actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final boolean nonEmptyContext = randomBoolean(); + if (nonEmptyContext) { + threadContext.putHeader("not empty", "value"); + } + final ContextPreservingActionListener actionListener; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + final ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + assertEquals("bar", threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("not empty")); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException("onFailure shouldn't be called", e); } + }; + if (randomBoolean()) { + actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); + } else { + actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); } + } - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - actionListener.onResponse(null); + actionListener.onResponse(null); - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - } + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); } public void testOriginalContextIsPreservedAfterOnFailure() throws Exception { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final boolean nonEmptyContext = randomBoolean(); - if (nonEmptyContext) { - threadContext.putHeader("not empty", "value"); - } - final ContextPreservingActionListener actionListener; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - final ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Void aVoid) { - throw new RuntimeException("onResponse shouldn't be called"); - } - - @Override - public void onFailure(Exception e) { - assertEquals("bar", threadContext.getHeader("foo")); - assertNull(threadContext.getHeader("not empty")); - } - }; - - if (randomBoolean()) { - actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); - } else { - actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final boolean nonEmptyContext = randomBoolean(); + if (nonEmptyContext) { + threadContext.putHeader("not empty", "value"); + } + final ContextPreservingActionListener actionListener; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + final ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + throw new RuntimeException("onResponse shouldn't be called"); } + @Override + public void onFailure(Exception e) { + assertEquals("bar", threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("not empty")); + } + }; + + if (randomBoolean()) { + actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); + } else { + actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); } - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); + } + + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - actionListener.onFailure(null); + actionListener.onFailure(null); - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - } - } + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); +} public void testOriginalContextIsWhenListenerThrows() throws Exception { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final boolean nonEmptyContext = randomBoolean(); - if (nonEmptyContext) { - threadContext.putHeader("not empty", "value"); - } - final ContextPreservingActionListener actionListener; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - final ActionListener delegate = new ActionListener() { - @Override - public void onResponse(Void aVoid) { - assertEquals("bar", threadContext.getHeader("foo")); - assertNull(threadContext.getHeader("not empty")); - throw new RuntimeException("onResponse called"); - } - - @Override - public void onFailure(Exception e) { - assertEquals("bar", threadContext.getHeader("foo")); - assertNull(threadContext.getHeader("not empty")); - throw new RuntimeException("onFailure called"); - } - }; - - if (randomBoolean()) { - actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); - } else { - actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final boolean nonEmptyContext = randomBoolean(); + if (nonEmptyContext) { + threadContext.putHeader("not empty", "value"); + } + final ContextPreservingActionListener actionListener; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + final ActionListener delegate = new ActionListener() { + @Override + public void onResponse(Void aVoid) { + assertEquals("bar", threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("not empty")); + throw new RuntimeException("onResponse called"); } + + @Override + public void onFailure(Exception e) { + assertEquals("bar", threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("not empty")); + throw new RuntimeException("onFailure called"); + } + }; + + if (randomBoolean()) { + actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate); + } else { + actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext); } + } - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null)); - assertEquals("onResponse called", e.getMessage()); + RuntimeException e = expectThrows(RuntimeException.class, () -> actionListener.onResponse(null)); + assertEquals("onResponse called", e.getMessage()); - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null)); - assertEquals("onFailure called", e.getMessage()); + e = expectThrows(RuntimeException.class, () -> actionListener.onFailure(null)); + assertEquals("onFailure called", e.getMessage()); - assertNull(threadContext.getHeader("foo")); - assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); - } + assertNull(threadContext.getHeader("foo")); + assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty")); } } diff --git a/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java b/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java index ad1412dfa44e0..886dbafab4fd9 100644 --- a/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/common/logging/DeprecationLoggerTests.java @@ -42,7 +42,6 @@ import java.security.PrivilegedAction; import java.security.ProtectionDomain; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -78,136 +77,118 @@ protected boolean enableWarningsCheck() { } public void testAddsHeaderWithThreadContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final Set threadContexts = Collections.singleton(threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Set threadContexts = Collections.singleton(threadContext); - final String param = randomAlphaOfLengthBetween(1, 5); - logger.deprecated(threadContexts, "A simple message [{}]", param); + final String param = randomAlphaOfLengthBetween(1, 5); + logger.deprecated(threadContexts, "A simple message [{}]", param); - final Map> responseHeaders = threadContext.getResponseHeaders(); + final Map> responseHeaders = threadContext.getResponseHeaders(); - assertThat(responseHeaders.size(), equalTo(1)); - final List responses = responseHeaders.get("Warning"); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\"")); - } + assertThat(responseHeaders.size(), equalTo(1)); + final List responses = responseHeaders.get("Warning"); + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\"")); } public void testContainingNewline() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final Set threadContexts = Collections.singleton(threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Set threadContexts = Collections.singleton(threadContext); - logger.deprecated(threadContexts, "this message contains a newline\n"); + logger.deprecated(threadContexts, "this message contains a newline\n"); - final Map> responseHeaders = threadContext.getResponseHeaders(); + final Map> responseHeaders = threadContext.getResponseHeaders(); - assertThat(responseHeaders.size(), equalTo(1)); - final List responses = responseHeaders.get("Warning"); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString("\"this message contains a newline%0A\"")); - } + assertThat(responseHeaders.size(), equalTo(1)); + final List responses = responseHeaders.get("Warning"); + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString("\"this message contains a newline%0A\"")); } public void testSurrogatePair() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final Set threadContexts = Collections.singleton(threadContext); - - logger.deprecated(threadContexts, "this message contains a surrogate pair 😱"); - - final Map> responseHeaders = threadContext.getResponseHeaders(); - - assertThat(responseHeaders.size(), equalTo(1)); - final List responses = responseHeaders.get("Warning"); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), warningValueMatcher); - - // convert UTF-16 to UTF-8 by hand to show the hard-coded constant below is correct - assertThat("😱", equalTo("\uD83D\uDE31")); - final int code = 0x10000 + ((0xD83D & 0x3FF) << 10) + (0xDE31 & 0x3FF); - @SuppressWarnings("PointlessBitwiseExpression") - final int[] points = new int[] { - (code >> 18) & 0x07 | 0xF0, - (code >> 12) & 0x3F | 0x80, - (code >> 6) & 0x3F | 0x80, - (code >> 0) & 0x3F | 0x80}; - final StringBuilder sb = new StringBuilder(); - // noinspection ForLoopReplaceableByForEach - for (int i = 0; i < points.length; i++) { - sb.append("%").append(Integer.toString(points[i], 16).toUpperCase(Locale.ROOT)); - } - assertThat(sb.toString(), equalTo("%F0%9F%98%B1")); - assertThat(responses.get(0), containsString("\"this message contains a surrogate pair %F0%9F%98%B1\"")); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Set threadContexts = Collections.singleton(threadContext); + + logger.deprecated(threadContexts, "this message contains a surrogate pair 😱"); + + final Map> responseHeaders = threadContext.getResponseHeaders(); + + assertThat(responseHeaders.size(), equalTo(1)); + final List responses = responseHeaders.get("Warning"); + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), warningValueMatcher); + + // convert UTF-16 to UTF-8 by hand to show the hard-coded constant below is correct + assertThat("😱", equalTo("\uD83D\uDE31")); + final int code = 0x10000 + ((0xD83D & 0x3FF) << 10) + (0xDE31 & 0x3FF); + @SuppressWarnings("PointlessBitwiseExpression") + final int[] points = new int[] { + (code >> 18) & 0x07 | 0xF0, + (code >> 12) & 0x3F | 0x80, + (code >> 6) & 0x3F | 0x80, + (code >> 0) & 0x3F | 0x80}; + final StringBuilder sb = new StringBuilder(); + // noinspection ForLoopReplaceableByForEach + for (int i = 0; i < points.length; i++) { + sb.append("%").append(Integer.toString(points[i], 16).toUpperCase(Locale.ROOT)); } + assertThat(sb.toString(), equalTo("%F0%9F%98%B1")); + assertThat(responses.get(0), containsString("\"this message contains a surrogate pair %F0%9F%98%B1\"")); } public void testAddsCombinedHeaderWithThreadContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - final Set threadContexts = Collections.singleton(threadContext); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final Set threadContexts = Collections.singleton(threadContext); - final String param = randomAlphaOfLengthBetween(1, 5); - logger.deprecated(threadContexts, "A simple message [{}]", param); - final String second = randomAlphaOfLengthBetween(1, 10); - logger.deprecated(threadContexts, second); + final String param = randomAlphaOfLengthBetween(1, 5); + logger.deprecated(threadContexts, "A simple message [{}]", param); + final String second = randomAlphaOfLengthBetween(1, 10); + logger.deprecated(threadContexts, second); - final Map> responseHeaders = threadContext.getResponseHeaders(); + final Map> responseHeaders = threadContext.getResponseHeaders(); - assertEquals(1, responseHeaders.size()); + assertEquals(1, responseHeaders.size()); - final List responses = responseHeaders.get("Warning"); + final List responses = responseHeaders.get("Warning"); - assertEquals(2, responses.size()); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\"")); - assertThat(responses.get(1), warningValueMatcher); - assertThat(responses.get(1), containsString("\"" + second + "\"")); - } + assertEquals(2, responses.size()); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\"")); + assertThat(responses.get(1), warningValueMatcher); + assertThat(responses.get(1), containsString("\"" + second + "\"")); } public void testCanRemoveThreadContext() throws IOException { final String expected = "testCanRemoveThreadContext"; final String unexpected = "testCannotRemoveThreadContext"; - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - DeprecationLogger.setThreadContext(threadContext); - logger.deprecated(expected); - - { - final Map> responseHeaders = threadContext.getResponseHeaders(); - final List responses = responseHeaders.get("Warning"); - - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString(expected)); - } - - DeprecationLogger.removeThreadContext(threadContext); - logger.deprecated(unexpected); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + DeprecationLogger.setThreadContext(threadContext); + logger.deprecated(expected); - { - final Map> responseHeaders = threadContext.getResponseHeaders(); - final List responses = responseHeaders.get("Warning"); + { + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString(expected)); - assertThat(responses.get(0), not(containsString(unexpected))); - } + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString(expected)); } - } - - public void testIgnoresClosedThreadContext() throws IOException { - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - Set threadContexts = new HashSet<>(1); - - threadContexts.add(threadContext); - threadContext.close(); + DeprecationLogger.removeThreadContext(threadContext); + logger.deprecated(unexpected); - logger.deprecated(threadContexts, "Ignored logger message"); + { + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); - assertTrue(threadContexts.contains(threadContext)); + assertThat(responses, hasSize(1)); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString(expected)); + assertThat(responses.get(0), not(containsString(unexpected))); + } } public void testSafeWithoutThreadContext() { @@ -219,22 +200,20 @@ public void testFailsWithoutThreadContextSet() { } public void testFailsWhenDoubleSettingSameThreadContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - DeprecationLogger.setThreadContext(threadContext); - - try { - expectThrows(IllegalStateException.class, () -> DeprecationLogger.setThreadContext(threadContext)); - } finally { - // cleanup after ourselves - DeprecationLogger.removeThreadContext(threadContext); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + DeprecationLogger.setThreadContext(threadContext); + + try { + expectThrows(IllegalStateException.class, () -> DeprecationLogger.setThreadContext(threadContext)); + } finally { + // cleanup after ourselves + DeprecationLogger.removeThreadContext(threadContext); } } public void testFailsWhenRemovingUnknownThreadContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext)); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext)); } public void testWarningValueFromWarningHeader() throws InterruptedException { @@ -274,21 +253,20 @@ public void testWarningHeaderCountSetting() throws IOException{ Settings settings = Settings.builder() .put("http.max_warning_header_count", maxWarningHeaderCount) .build(); - try (ThreadContext threadContext = new ThreadContext(settings)) { - final Set threadContexts = Collections.singleton(threadContext); - // try to log three warning messages - logger.deprecated(threadContexts, "A simple message 1"); - logger.deprecated(threadContexts, "A simple message 2"); - logger.deprecated(threadContexts, "A simple message 3"); - final Map> responseHeaders = threadContext.getResponseHeaders(); - final List responses = responseHeaders.get("Warning"); - - assertEquals(maxWarningHeaderCount, responses.size()); - assertThat(responses.get(0), warningValueMatcher); - assertThat(responses.get(0), containsString("\"A simple message 1")); - assertThat(responses.get(1), warningValueMatcher); - assertThat(responses.get(1), containsString("\"A simple message 2")); - } + ThreadContext threadContext = new ThreadContext(settings); + final Set threadContexts = Collections.singleton(threadContext); + // try to log three warning messages + logger.deprecated(threadContexts, "A simple message 1"); + logger.deprecated(threadContexts, "A simple message 2"); + logger.deprecated(threadContexts, "A simple message 3"); + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); + + assertEquals(maxWarningHeaderCount, responses.size()); + assertThat(responses.get(0), warningValueMatcher); + assertThat(responses.get(0), containsString("\"A simple message 1")); + assertThat(responses.get(1), warningValueMatcher); + assertThat(responses.get(1), containsString("\"A simple message 2")); } public void testWarningHeaderSizeSetting() throws IOException{ @@ -302,23 +280,22 @@ public void testWarningHeaderSizeSetting() throws IOException{ String message2 = new String(arr, StandardCharsets.UTF_8) + "2"; String message3 = new String(arr, StandardCharsets.UTF_8) + "3"; - try (ThreadContext threadContext = new ThreadContext(settings)) { - final Set threadContexts = Collections.singleton(threadContext); - // try to log three warning messages - logger.deprecated(threadContexts, message1); - logger.deprecated(threadContexts, message2); - logger.deprecated(threadContexts, message3); - final Map> responseHeaders = threadContext.getResponseHeaders(); - final List responses = responseHeaders.get("Warning"); - - long warningHeadersSize = 0L; - for (String response : responses){ - warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length + - response.getBytes(StandardCharsets.UTF_8).length; - } - // assert that the size of all warning headers is less or equal to 1Kb - assertTrue(warningHeadersSize <= 1024); + ThreadContext threadContext = new ThreadContext(settings); + final Set threadContexts = Collections.singleton(threadContext); + // try to log three warning messages + logger.deprecated(threadContexts, message1); + logger.deprecated(threadContexts, message2); + logger.deprecated(threadContexts, message3); + final Map> responseHeaders = threadContext.getResponseHeaders(); + final List responses = responseHeaders.get("Warning"); + + long warningHeadersSize = 0L; + for (String response : responses){ + warningHeadersSize += "Warning".getBytes(StandardCharsets.UTF_8).length + + response.getBytes(StandardCharsets.UTF_8).length; } + // assert that the size of all warning headers is less or equal to 1Kb + assertTrue(warningHeadersSize <= 1024); } @SuppressLoggerChecks(reason = "Safe as this is using mockito") public void testLogPermissions() { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java index c2c66d12a6e35..a768ac9ae59a8 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; -import org.junit.After; import org.junit.Before; import java.io.IOException; @@ -47,11 +46,6 @@ public void setUpThreadContext() { threadContext = new ThreadContext(Settings.EMPTY); } - @After - public void tearDownThreadContext() { - threadContext.close(); - } - public void testPut() throws InterruptedException { boolean blockInternal = randomBoolean(); AtomicInteger received = new AtomicInteger(0); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java index 7ffe6a4090a29..e08a6ca443315 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java @@ -64,7 +64,6 @@ public void testExactWindowSizeAdjustment() throws Exception { }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } public void testAutoQueueSizingUp() throws Exception { @@ -93,7 +92,6 @@ public void testAutoQueueSizingUp() throws Exception { }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } public void testAutoQueueSizingDown() throws Exception { @@ -121,7 +119,6 @@ public void testAutoQueueSizingDown() throws Exception { }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } public void testAutoQueueSizingWithMin() throws Exception { @@ -151,7 +148,6 @@ public void testAutoQueueSizingWithMin() throws Exception { }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } public void testAutoQueueSizingWithMax() throws Exception { @@ -181,7 +177,6 @@ public void testAutoQueueSizingWithMax() throws Exception { }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } public void testExecutionEWMACalculation() throws Exception { @@ -222,7 +217,6 @@ public void testExecutionEWMACalculation() throws Exception { executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } /** Use a runnable wrapper that simulates a task with unknown failures. */ @@ -244,7 +238,6 @@ public void testExceptionThrowingTask() throws Exception { executeTask(executor, 1); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); - context.close(); } private Function fastWrapper() { diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java index 9729aca294184..46c0d6a589925 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ThreadContextTests.java @@ -244,35 +244,6 @@ public void testCopyHeaders() { assertEquals("bar", threadContext.getHeader("foo")); } - public void testAccessClosed() throws IOException { - Settings build = Settings.builder().put("request.headers.default", "1").build(); - ThreadContext threadContext = new ThreadContext(build); - threadContext.putHeader("foo", "bar"); - threadContext.putTransient("ctx.foo", 1); - - threadContext.close(); - try { - threadContext.getHeader("foo"); - fail(); - } catch (IllegalStateException ise) { - assertEquals("threadcontext is already closed", ise.getMessage()); - } - - try { - threadContext.putTransient("foo", new Object()); - fail(); - } catch (IllegalStateException ise) { - assertEquals("threadcontext is already closed", ise.getMessage()); - } - - try { - threadContext.putHeader("boom", "boom"); - fail(); - } catch (IllegalStateException ise) { - assertEquals("threadcontext is already closed", ise.getMessage()); - } - } - public void testSerialize() throws IOException { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); @@ -397,244 +368,238 @@ public void testStashAndMergeWithModifiedDefaults() { } public void testPreserveContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - Runnable withContext; + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Runnable withContext; - // Create a runnable that should run with some header - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> { - assertEquals("bar", threadContext.getHeader("foo")); - })); - } + // Create a runnable that should run with some header + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> { + assertEquals("bar", threadContext.getHeader("foo")); + })); + } - // We don't see the header outside of the runnable - assertNull(threadContext.getHeader("foo")); + // We don't see the header outside of the runnable + assertNull(threadContext.getHeader("foo")); - // But we do inside of it - withContext.run(); + // But we do inside of it + withContext.run(); - // but not after - assertNull(threadContext.getHeader("foo")); - } + // but not after + assertNull(threadContext.getHeader("foo")); } public void testPreserveContextKeepsOriginalContextWhenCalledTwice() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - Runnable originalWithContext; - Runnable withContext; - - // Create a runnable that should run with some header - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> { - assertEquals("bar", threadContext.getHeader("foo")); - })); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Runnable originalWithContext; + Runnable withContext; - // Now attempt to rewrap it - originalWithContext = withContext; - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "zot"); - withContext = threadContext.preserveContext(withContext); - } - - // We get the original context inside the runnable - withContext.run(); + // Create a runnable that should run with some header + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + withContext = threadContext.preserveContext(sometimesAbstractRunnable(() -> { + assertEquals("bar", threadContext.getHeader("foo")); + })); + } - // In fact the second wrapping didn't even change it - assertThat(withContext, sameInstance(originalWithContext)); + // Now attempt to rewrap it + originalWithContext = withContext; + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "zot"); + withContext = threadContext.preserveContext(withContext); } + + // We get the original context inside the runnable + withContext.run(); + + // In fact the second wrapping didn't even change it + assertThat(withContext, sameInstance(originalWithContext)); } public void testPreservesThreadsOriginalContextOnRunException() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - Runnable withContext; - - // create a abstract runnable, add headers and transient objects and verify in the methods - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - boolean systemContext = randomBoolean(); - if (systemContext) { - threadContext.markAsSystemContext(); - } - threadContext.putTransient("foo", "bar_transient"); - withContext = threadContext.preserveContext(new AbstractRunnable() { - - @Override - public void onAfter() { - assertEquals(systemContext, threadContext.isSystemContext()); - assertEquals("bar", threadContext.getHeader("foo")); - assertEquals("bar_transient", threadContext.getTransient("foo")); - assertNotNull(threadContext.getTransient("failure")); - assertEquals("exception from doRun", ((RuntimeException) threadContext.getTransient("failure")).getMessage()); - assertFalse(threadContext.isDefaultContext()); - threadContext.putTransient("after", "after"); - } - - @Override - public void onFailure(Exception e) { - assertEquals(systemContext, threadContext.isSystemContext()); - assertEquals("exception from doRun", e.getMessage()); - assertEquals("bar", threadContext.getHeader("foo")); - assertEquals("bar_transient", threadContext.getTransient("foo")); - assertFalse(threadContext.isDefaultContext()); - threadContext.putTransient("failure", e); - } - - @Override - protected void doRun() throws Exception { - assertEquals(systemContext, threadContext.isSystemContext()); - assertEquals("bar", threadContext.getHeader("foo")); - assertEquals("bar_transient", threadContext.getTransient("foo")); - assertFalse(threadContext.isDefaultContext()); - throw new RuntimeException("exception from doRun"); - } - }); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Runnable withContext; - // We don't see the header outside of the runnable - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertNull(threadContext.getTransient("failure")); - assertNull(threadContext.getTransient("after")); - assertTrue(threadContext.isDefaultContext()); + // create a abstract runnable, add headers and transient objects and verify in the methods + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + boolean systemContext = randomBoolean(); + if (systemContext) { + threadContext.markAsSystemContext(); + } + threadContext.putTransient("foo", "bar_transient"); + withContext = threadContext.preserveContext(new AbstractRunnable() { - // But we do inside of it - withContext.run(); + @Override + public void onAfter() { + assertEquals(systemContext, threadContext.isSystemContext()); + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("bar_transient", threadContext.getTransient("foo")); + assertNotNull(threadContext.getTransient("failure")); + assertEquals("exception from doRun", ((RuntimeException) threadContext.getTransient("failure")).getMessage()); + assertFalse(threadContext.isDefaultContext()); + threadContext.putTransient("after", "after"); + } - // verify not seen after - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertNull(threadContext.getTransient("failure")); - assertNull(threadContext.getTransient("after")); - assertTrue(threadContext.isDefaultContext()); - - // repeat with regular runnable - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - threadContext.putTransient("foo", "bar_transient"); - withContext = threadContext.preserveContext(() -> { + @Override + public void onFailure(Exception e) { + assertEquals(systemContext, threadContext.isSystemContext()); + assertEquals("exception from doRun", e.getMessage()); assertEquals("bar", threadContext.getHeader("foo")); assertEquals("bar_transient", threadContext.getTransient("foo")); assertFalse(threadContext.isDefaultContext()); - threadContext.putTransient("run", true); - throw new RuntimeException("exception from run"); - }); - } + threadContext.putTransient("failure", e); + } - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertNull(threadContext.getTransient("run")); - assertTrue(threadContext.isDefaultContext()); + @Override + protected void doRun() throws Exception { + assertEquals(systemContext, threadContext.isSystemContext()); + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("bar_transient", threadContext.getTransient("foo")); + assertFalse(threadContext.isDefaultContext()); + throw new RuntimeException("exception from doRun"); + } + }); + } - final Runnable runnable = withContext; - RuntimeException e = expectThrows(RuntimeException.class, runnable::run); - assertEquals("exception from run", e.getMessage()); - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertNull(threadContext.getTransient("run")); - assertTrue(threadContext.isDefaultContext()); + // We don't see the header outside of the runnable + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertNull(threadContext.getTransient("failure")); + assertNull(threadContext.getTransient("after")); + assertTrue(threadContext.isDefaultContext()); + + // But we do inside of it + withContext.run(); + + // verify not seen after + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertNull(threadContext.getTransient("failure")); + assertNull(threadContext.getTransient("after")); + assertTrue(threadContext.isDefaultContext()); + + // repeat with regular runnable + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("foo", "bar_transient"); + withContext = threadContext.preserveContext(() -> { + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("bar_transient", threadContext.getTransient("foo")); + assertFalse(threadContext.isDefaultContext()); + threadContext.putTransient("run", true); + throw new RuntimeException("exception from run"); + }); } + + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertNull(threadContext.getTransient("run")); + assertTrue(threadContext.isDefaultContext()); + + final Runnable runnable = withContext; + RuntimeException e = expectThrows(RuntimeException.class, runnable::run); + assertEquals("exception from run", e.getMessage()); + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertNull(threadContext.getTransient("run")); + assertTrue(threadContext.isDefaultContext()); } public void testPreservesThreadsOriginalContextOnFailureException() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - Runnable withContext; - - // a runnable that throws from onFailure - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - threadContext.putTransient("foo", "bar_transient"); - withContext = threadContext.preserveContext(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - throw new RuntimeException("from onFailure", e); - } - - @Override - protected void doRun() throws Exception { - assertEquals("bar", threadContext.getHeader("foo")); - assertEquals("bar_transient", threadContext.getTransient("foo")); - assertFalse(threadContext.isDefaultContext()); - throw new RuntimeException("from doRun"); - } - }); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Runnable withContext; - // We don't see the header outside of the runnable - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertTrue(threadContext.isDefaultContext()); - - // But we do inside of it - RuntimeException e = expectThrows(RuntimeException.class, withContext::run); - assertEquals("from onFailure", e.getMessage()); - assertEquals("from doRun", e.getCause().getMessage()); + // a runnable that throws from onFailure + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("foo", "bar_transient"); + withContext = threadContext.preserveContext(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new RuntimeException("from onFailure", e); + } - // but not after - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertTrue(threadContext.isDefaultContext()); + @Override + protected void doRun() throws Exception { + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("bar_transient", threadContext.getTransient("foo")); + assertFalse(threadContext.isDefaultContext()); + throw new RuntimeException("from doRun"); + } + }); } + + // We don't see the header outside of the runnable + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertTrue(threadContext.isDefaultContext()); + + // But we do inside of it + RuntimeException e = expectThrows(RuntimeException.class, withContext::run); + assertEquals("from onFailure", e.getMessage()); + assertEquals("from doRun", e.getCause().getMessage()); + + // but not after + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertTrue(threadContext.isDefaultContext()); } public void testPreservesThreadsOriginalContextOnAfterException() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - Runnable withContext; - - // a runnable that throws from onAfter - try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { - threadContext.putHeader("foo", "bar"); - threadContext.putTransient("foo", "bar_transient"); - withContext = threadContext.preserveContext(new AbstractRunnable() { - - @Override - public void onAfter() { - throw new RuntimeException("from onAfter"); - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException("from onFailure", e); - } - - @Override - protected void doRun() throws Exception { - assertEquals("bar", threadContext.getHeader("foo")); - assertEquals("bar_transient", threadContext.getTransient("foo")); - assertFalse(threadContext.isDefaultContext()); - } - }); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + Runnable withContext; - // We don't see the header outside of the runnable - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertTrue(threadContext.isDefaultContext()); + // a runnable that throws from onAfter + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("foo", "bar_transient"); + withContext = threadContext.preserveContext(new AbstractRunnable() { - // But we do inside of it - RuntimeException e = expectThrows(RuntimeException.class, withContext::run); - assertEquals("from onAfter", e.getMessage()); - assertNull(e.getCause()); + @Override + public void onAfter() { + throw new RuntimeException("from onAfter"); + } - // but not after - assertNull(threadContext.getHeader("foo")); - assertNull(threadContext.getTransient("foo")); - assertTrue(threadContext.isDefaultContext()); + @Override + public void onFailure(Exception e) { + throw new RuntimeException("from onFailure", e); + } + + @Override + protected void doRun() throws Exception { + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("bar_transient", threadContext.getTransient("foo")); + assertFalse(threadContext.isDefaultContext()); + } + }); } + + // We don't see the header outside of the runnable + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertTrue(threadContext.isDefaultContext()); + + // But we do inside of it + RuntimeException e = expectThrows(RuntimeException.class, withContext::run); + assertEquals("from onAfter", e.getMessage()); + assertNull(e.getCause()); + + // but not after + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("foo")); + assertTrue(threadContext.isDefaultContext()); } public void testMarkAsSystemContext() throws IOException { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - assertFalse(threadContext.isSystemContext()); - try (ThreadContext.StoredContext context = threadContext.stashContext()) { - assertFalse(threadContext.isSystemContext()); - threadContext.markAsSystemContext(); - assertTrue(threadContext.isSystemContext()); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + assertFalse(threadContext.isSystemContext()); + try (ThreadContext.StoredContext context = threadContext.stashContext()) { assertFalse(threadContext.isSystemContext()); + threadContext.markAsSystemContext(); + assertTrue(threadContext.isSystemContext()); } + assertFalse(threadContext.isSystemContext()); } public void testPutHeaders() { diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index 5e06e557975ea..2cd16d3db4efb 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.Matchers; import java.io.IOException; import java.nio.file.Path; @@ -44,12 +43,14 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -166,7 +167,6 @@ public void testCloseOnOutstandingTask() throws Exception { assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") public void testCloseRaceWithTaskExecution() throws Exception { Node node = new MockNode(baseSettings().build(), basePlugins()); node.start(); @@ -180,9 +180,13 @@ public void testCloseRaceWithTaskExecution() throws Exception { } catch (InterruptedException e) { throw new AssertionError("interrupted while waiting", e); } - threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { - while (shouldRun.get()); - }); + try { + threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + while (shouldRun.get()); + }); + } catch (RejectedExecutionException e) { + assertThat(e.getMessage(), containsString("[Terminated,")); + } }); Thread closeThread = new Thread(() -> { running.countDown(); @@ -268,7 +272,7 @@ public void testCloseOnLeakedIndexReaderReference() throws Exception { IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS)); searcher.close(); - assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references")); + assertThat(e.getMessage(), containsString("Something is leaking index readers or store references")); } public void testCloseOnLeakedStoreReference() throws Exception { @@ -284,6 +288,6 @@ public void testCloseOnLeakedStoreReference() throws Exception { IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS)); shard.store().decRef(); - assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references")); + assertThat(e.getMessage(), containsString("Something is leaking index readers or store references")); } } diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index 0be9858269af0..68aff95419ee3 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -62,8 +62,7 @@ public void testAbsoluteTime() throws Exception { // the delta can be large, we just care it is the same order of magnitude assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, delta < 10000); } finally { - threadPool.shutdown(); - threadPool.close(); + terminate(threadPool); } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java index 39bf18a0daff2..7a515726ff458 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java @@ -90,9 +90,8 @@ public void testLoggingHandler() throws IOException { private BytesReference buildRequest() throws IOException { try (BytesStreamOutput messageOutput = new BytesStreamOutput()) { messageOutput.setVersion(Version.CURRENT); - try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { - context.writeTo(messageOutput); - } + ThreadContext context = new ThreadContext(Settings.EMPTY); + context.writeTo(messageOutput); messageOutput.writeStringArray(new String[0]); messageOutput.writeString(ClusterStatsAction.NAME); new ClusterStatsRequest().writeTo(messageOutput); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index c280865b2998c..43d115fddf9d8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -374,7 +374,8 @@ public final void after() throws Exception { // initialized if (threadContext != null) { ensureNoWarnings(); - assert threadContext == null; + DeprecationLogger.removeThreadContext(threadContext); + threadContext = null; } ensureAllSearchContextsReleased(); ensureCheckIndexPassed(); @@ -403,7 +404,7 @@ private void ensureNoWarnings() { assertNull("unexpected warning headers", warnings); } } finally { - resetDeprecationLogger(false); + resetDeprecationLogger(); } } @@ -441,7 +442,7 @@ protected final void assertWarnings(String... expectedWarnings) { assertWarnings(actualWarnings, expectedWarnings); } } finally { - resetDeprecationLogger(true); + resetDeprecationLogger(); } } @@ -464,21 +465,11 @@ private void assertWarnings(List actualWarnings, String[] expectedWarnin } /** - * Reset the deprecation logger by removing the current thread context, and setting a new thread context if {@code setNewThreadContext} - * is set to {@code true} and otherwise clearing the current thread context. - * - * @param setNewThreadContext whether or not to attach a new thread context to the deprecation logger + * Reset the deprecation logger by clearing the current thread context. */ - private void resetDeprecationLogger(final boolean setNewThreadContext) { - // "clear" current warning headers by setting a new ThreadContext - DeprecationLogger.removeThreadContext(this.threadContext); - this.threadContext.close(); - if (setNewThreadContext) { - this.threadContext = new ThreadContext(Settings.EMPTY); - DeprecationLogger.setThreadContext(this.threadContext); - } else { - this.threadContext = null; - } + private void resetDeprecationLogger() { + // "clear" context by stashing current values and dropping the returned StoredContext + threadContext.stashContext(); } private static final List statusData = new ArrayList<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index add3a8b1f6896..c713254283569 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -863,14 +863,13 @@ protected static void configureClient(RestClientBuilder builder, Settings settin throw new RuntimeException("Error setting up ssl", e); } } - try (ThreadContext threadContext = new ThreadContext(settings)) { - Header[] defaultHeaders = new Header[threadContext.getHeaders().size()]; - int i = 0; - for (Map.Entry entry : threadContext.getHeaders().entrySet()) { - defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); - } - builder.setDefaultHeaders(defaultHeaders); + Map headers = ThreadContext.buildDefaultHeaders(settings); + Header[] defaultHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry entry : headers.entrySet()) { + defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); } + builder.setDefaultHeaders(defaultHeaders); final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString == null ? "60s" : socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index 10bbb64cb9462..dbc34d0e0f886 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -83,7 +84,7 @@ public void testRepositoryExistenceForMissingRepo() { assertThat(e.getMessage(), containsString("no such repository [repo]")); } - public void testNothingScheduledWhenNotRunning() { + public void testNothingScheduledWhenNotRunning() throws InterruptedException { ClockMock clock = new ClockMock(); SnapshotLifecyclePolicyMetadata initialPolicy = SnapshotLifecyclePolicyMetadata.builder() .setPolicy(createPolicy("initial", "*/1 * * * * ?")) @@ -94,13 +95,13 @@ public void testNothingScheduledWhenNotRunning() { ClusterState initialState = createState(new SnapshotLifecycleMetadata( Collections.singletonMap(initialPolicy.getPolicy().getId(), initialPolicy), OperationMode.RUNNING, new SnapshotLifecycleStats())); - try (ThreadPool threadPool = new TestThreadPool("test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool); + ThreadPool threadPool = new TestThreadPool("test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool); SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, () -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) { - + sls.offMaster(); - + SnapshotLifecyclePolicyMetadata newPolicy = SnapshotLifecyclePolicyMetadata.builder() .setPolicy(createPolicy("foo", "*/1 * * * * ?")) .setHeaders(Collections.emptyMap()) @@ -113,35 +114,36 @@ public void testNothingScheduledWhenNotRunning() { createState(new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats())); ClusterState state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats())); - + sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState)); - + // Since the service does not think it is master, it should not be triggered or scheduled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + sls.onMaster(); assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("initial-1"))); - + state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats())); sls.clusterChanged(new ClusterChangedEvent("2", state, emptyState)); - + // Since the service is stopping, jobs should have been cancelled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats())); sls.clusterChanged(new ClusterChangedEvent("3", state, emptyState)); - + // Since the service is stopped, jobs should have been cancelled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + // No jobs should be scheduled when service is closed state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats())); sls.close(); sls.onMaster(); sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState)); assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + } finally { threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } } @@ -154,8 +156,8 @@ public void testPolicyCRUD() throws Exception { ClockMock clock = new ClockMock(); final AtomicInteger triggerCount = new AtomicInteger(0); final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); - try (ThreadPool threadPool = new TestThreadPool("test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { @@ -250,8 +252,9 @@ public void testPolicyCRUD() throws Exception { // Signify becoming non-master, the jobs should all be cancelled sls.offMaster(); assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + } finally { threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } } @@ -262,8 +265,8 @@ public void testPolicyNamesEndingInNumbers() throws Exception { ClockMock clock = new ClockMock(); final AtomicInteger triggerCount = new AtomicInteger(0); final AtomicReference> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet()); - try (ThreadPool threadPool = new TestThreadPool("test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY, () -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) { sls.onMaster(); @@ -304,8 +307,9 @@ public void testPolicyNamesEndingInNumbers() throws Exception { sls.offMaster(); assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); - + } finally { threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 1915a76fe1dce..9c08c338348d9 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -43,13 +44,13 @@ public class SnapshotRetentionServiceTests extends ESTestCase { clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings); } - public void testJobsAreScheduled() { + public void testJobsAreScheduled() throws InterruptedException { final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT); ClockMock clock = new ClockMock(); - try (ThreadPool threadPool = new TestThreadPool("test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); + ThreadPool threadPool = new TestThreadPool("test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY, FakeRetentionTask::new, clusterService, clock)) { assertThat(service.getScheduler().jobCount(), equalTo(0)); @@ -76,33 +77,34 @@ public void testJobsAreScheduled() { } } - public void testManualTriggering() { + public void testManualTriggering() throws InterruptedException { final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT); ClockMock clock = new ClockMock(); AtomicInteger invoked = new AtomicInteger(0); - - try (ThreadPool threadPool = new TestThreadPool("test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); + + ThreadPool threadPool = new TestThreadPool("test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY, () -> new FakeRetentionTask(event -> { assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID)); invoked.incrementAndGet(); }), clusterService, clock)) { - + service.onMaster(); service.triggerRetention(); assertThat(invoked.get(), equalTo(1)); - + service.offMaster(); service.triggerRetention(); assertThat(invoked.get(), equalTo(1)); - + service.onMaster(); service.triggerRetention(); assertThat(invoked.get(), equalTo(2)); - + } finally { threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 8dea5e2b6adbd..ede01f35b04b0 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -154,8 +154,8 @@ public void testRetentionTaskFailure() throws Exception { } private void retentionTaskTest(final boolean deletionSuccess) throws Exception { - try (ThreadPool threadPool = new TestThreadPool("slm-test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("slm-test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { final String policyId = "policy"; @@ -222,7 +222,7 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception { boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true)); assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName())); - + } finally { threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } @@ -237,8 +237,8 @@ public void testFailureTimeBoundedDeletion() throws Exception { } private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception { - try (ThreadPool threadPool = new TestThreadPool("slm-test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("slm-test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { final String policyId = "policy"; @@ -321,7 +321,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS); assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true)); assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName())); - + } finally { threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } @@ -374,8 +374,8 @@ public void testSkipWhileStopped() throws Exception { } private void doTestSkipDuringMode(OperationMode mode) throws Exception { - try (ThreadPool threadPool = new TestThreadPool("slm-test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("slm-test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { final String policyId = "policy"; final String repoId = "repo"; @@ -398,7 +398,7 @@ private void doTestSkipDuringMode(OperationMode mode) throws Exception { long time = System.currentTimeMillis(); task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); - + } finally { threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } @@ -413,33 +413,35 @@ public void testRunManuallyWhileStopped() throws Exception { } private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception { - try (ThreadPool threadPool = new TestThreadPool("slm-test"); - ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + ThreadPool threadPool = new TestThreadPool("slm-test"); + try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); Client noOpClient = new NoOpClient("slm-test")) { final String policyId = "policy"; final String repoId = "repo"; SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?", repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null)); - + ClusterState state = createState(mode, policy); ClusterServiceUtils.setState(clusterService, state); - + AtomicBoolean retentionWasRun = new AtomicBoolean(false); MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService, - new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { }), + new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { + }), threadPool, () -> { retentionWasRun.set(true); return Collections.emptyMap(); }, - (deletionPolicyId, repo, snapId, slmStats, listener) -> { }, + (deletionPolicyId, repo, snapId, slmStats, listener) -> { + }, System::nanoTime); - + long time = System.currentTimeMillis(); task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time)); - + assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get()); - + } finally { threadPool.shutdownNow(); threadPool.awaitTermination(10, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index c5b019b967074..bc14d4e846e1b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -49,9 +49,8 @@ import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail; import org.elasticsearch.xpack.security.authc.Realms; import org.hamcrest.Matchers; -import org.junit.Before; +import org.junit.After; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -143,13 +142,9 @@ private static T findComponent(Class type, Collection components) return null; } - @Before - public void cleanup() throws IOException { - if (threadContext != null) { - threadContext.stashContext(); - threadContext.close(); - threadContext = null; - } + @After + public void cleanup() { + threadContext = null; } public void testCustomRealmExtension() throws Exception { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index 89f33f809befb..11d9e6c1cd4e2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -399,51 +399,50 @@ public String executor() { } public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exception { - try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) { - threadContext.putTransient("foo", "bar"); - threadContext.putHeader("key", "value"); - TransportResponseHandler handler; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - threadContext.putTransient("foo", "different_bar"); - threadContext.putHeader("key", "value2"); - handler = new TransportService.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true), - new TransportResponseHandler() { - - @Override - public Empty read(StreamInput in) { - return Empty.INSTANCE; - } - - @Override - public void handleResponse(Empty response) { - assertEquals("different_bar", threadContext.getTransient("foo")); - assertEquals("value2", threadContext.getHeader("key")); - } - - @Override - public void handleException(TransportException exp) { - assertEquals("different_bar", threadContext.getTransient("foo")); - assertEquals("value2", threadContext.getHeader("key")); - } - - @Override - public String executor() { - return null; - } - }); - } + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + threadContext.putTransient("foo", "bar"); + threadContext.putHeader("key", "value"); + TransportResponseHandler handler; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.putTransient("foo", "different_bar"); + threadContext.putHeader("key", "value2"); + handler = new TransportService.ContextRestoreResponseHandler<>(threadContext.newRestorableContext(true), + new TransportResponseHandler() { + + @Override + public Empty read(StreamInput in) { + return Empty.INSTANCE; + } + + @Override + public void handleResponse(Empty response) { + assertEquals("different_bar", threadContext.getTransient("foo")); + assertEquals("value2", threadContext.getHeader("key")); + } + + @Override + public void handleException(TransportException exp) { + assertEquals("different_bar", threadContext.getTransient("foo")); + assertEquals("value2", threadContext.getHeader("key")); + } + + @Override + public String executor() { + return null; + } + }); + } - assertEquals("bar", threadContext.getTransient("foo")); - assertEquals("value", threadContext.getHeader("key")); - handler.handleResponse(null); + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("value", threadContext.getHeader("key")); + handler.handleResponse(null); - assertEquals("bar", threadContext.getTransient("foo")); - assertEquals("value", threadContext.getHeader("key")); - handler.handleException(null); + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("value", threadContext.getHeader("key")); + handler.handleException(null); - assertEquals("bar", threadContext.getTransient("foo")); - assertEquals("value", threadContext.getHeader("key")); - } + assertEquals("bar", threadContext.getTransient("foo")); + assertEquals("value", threadContext.getHeader("key")); } private String[] randomRoles() {