Skip to content

Commit 8d87c48

Browse files
committed
Send client headers from TransportClient (elastic#30803)
This change adds a simple header to the transport client that is present on the servers thread context that ensures we can detect if a transport client talks to the server in a specific request. This change also adds a header for xpack to detect if the client has xpack installed.
1 parent 0380d93 commit 8d87c48

File tree

5 files changed

+25
-2
lines changed

5 files changed

+25
-2
lines changed

server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.client.transport;
2121

22+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2223
import org.elasticsearch.core.internal.io.IOUtils;
2324
import org.elasticsearch.action.Action;
2425
import org.elasticsearch.action.ActionListener;
@@ -128,7 +129,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
128129
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
129130
}
130131
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
131-
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
132+
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
133+
+ "." + "transport_client", true).build();
132134
final List<Closeable> resourcesToClose = new ArrayList<>();
133135
final ThreadPool threadPool = new ThreadPool(settings);
134136
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel,
13671367
streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
13681368
streamIn.setVersion(version);
13691369
threadPool.getThreadContext().readHeaders(streamIn);
1370+
threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
13701371
if (TransportStatus.isRequest(status)) {
13711372
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
13721373
} else {

server/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ public void testOverrideHeader() throws Exception {
139139

140140
protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
141141
assertNotNull(headers);
142+
headers = new HashMap<>(headers);
143+
headers.remove("transport_client"); // default header on TPC
142144
assertEquals(expected.size(), headers.size());
143145
for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
144146
assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue());
145147
}
146148
}
147149

148150
protected static void assertHeaders(ThreadPool pool) {
149-
Map<String, String> headers = new HashMap<>();
150151
Settings asSettings = HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX);
151152
assertHeaders(pool.getThreadContext().getHeaders(),
152153
asSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> asSettings.get(k))));

server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.io.stream.StreamOutput;
2828
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2930
import org.elasticsearch.env.Environment;
3031
import org.elasticsearch.plugins.Plugin;
3132
import org.elasticsearch.test.ESTestCase;
@@ -63,13 +64,29 @@ public void testPluginNamedWriteablesRegistered() {
6364
}
6465
}
6566

67+
public void testDefaultHeaderContainsPlugins() {
68+
Settings baseSettings = Settings.builder()
69+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
70+
.build();
71+
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
72+
ThreadContext threadContext = client.threadPool().getThreadContext();
73+
assertEquals("true", threadContext.getHeader("transport_client"));
74+
assertEquals("true", threadContext.getHeader("test"));
75+
}
76+
}
77+
6678
public static class MockPlugin extends Plugin {
6779

6880
@Override
6981
public List<Entry> getNamedWriteables() {
7082
return Arrays.asList(new Entry[]{ new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new)});
7183
}
7284

85+
@Override
86+
public Settings additionalSettings() {
87+
return Settings.builder().put(ThreadContext.PREFIX + "." + "test", true).build();
88+
}
89+
7390
public class MockNamedWriteable implements NamedWriteable {
7491

7592
static final String NAME = "mockNamedWritable";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.settings.Settings;
1717
import org.elasticsearch.common.util.BigArrays;
1818
import org.elasticsearch.common.util.PageCacheRecycler;
19+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1920
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2021
import org.elasticsearch.indices.breaker.CircuitBreakerService;
2122
import org.elasticsearch.license.DeleteLicenseAction;
@@ -193,6 +194,7 @@ static Settings additionalSettings(final Settings settings, final boolean enable
193194
final Settings.Builder builder = Settings.builder();
194195
builder.put(SecuritySettings.addTransportSettings(settings));
195196
builder.put(SecuritySettings.addUserSettings(settings));
197+
builder.put(ThreadContext.PREFIX + "." + "has_xpack", true);
196198
return builder.build();
197199
} else {
198200
return Settings.EMPTY;

0 commit comments

Comments
 (0)