Skip to content

Commit 4522b57

Browse files
jasontedorywelsch
andauthored
Introduce client feature tracking (#31020)
This commit introduces the ability for a client to communicate to the server features that it can support and for these features to be used in influencing the decisions that the server makes when communicating with the client. To this end we carry the features from the client to the underlying stream as we carry the version of the client today. This enables us to enhance the logic where we make protocol decisions on the basis of the version on the stream to also make protocol decisions on the basis of the features on the stream. With such functionality, the client can communicate to the server if it is a transport client, or if it has, for example, X-Pack installed. This enables us to support rolling upgrades from the OSS distribution to the default distribution without breaking client connectivity as we can now elect to serialize customs in the cluster state depending on whether or not the client reports to us using the feature capabilities that it can under these customs. This means that we would avoid sending a client pieces of the cluster state that it can not understand. However, we want to take care and always send the full cluster state during node-to-node communication as otherwise we would end up with different understanding of what is in the cluster state across nodes depending on which features they reported to have. This is why when deciding whether or not to write out a custom we always send the custom if the client is not a transport client and otherwise do not send the custom if the client is transport client that does not report to have the feature required by the custom. Co-authored-by: Yannick Welsch <[email protected]>
1 parent b8fda58 commit 4522b57

File tree

24 files changed

+887
-67
lines changed

24 files changed

+887
-67
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) {
104104
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
105105
context.readHeaders(in);
106106
}
107+
// now we decode the features
108+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
109+
in.readStringArray();
110+
}
107111
// now we can decode the action name
108112
sb.append(", action: ").append(in.readString());
109113
}

server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
9898
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
9999
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
100100

101+
public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";
102+
101103
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
102104
final Settings.Builder settingsBuilder = Settings.builder()
103105
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@@ -130,8 +132,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
130132
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
131133
}
132134
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
133-
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
134-
+ "." + "transport_client", true).build();
135+
final Settings settings =
136+
Settings.builder()
137+
.put(defaultSettings)
138+
.put(pluginsService.updatedSettings())
139+
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
140+
.build();
135141
final List<Closeable> resourcesToClose = new ArrayList<>();
136142
final ThreadPool threadPool = new ThreadPool(settings);
137143
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2525

26+
import org.elasticsearch.client.transport.TransportClient;
2627
import org.elasticsearch.cluster.block.ClusterBlock;
2728
import org.elasticsearch.cluster.block.ClusterBlocks;
2829
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -61,6 +62,7 @@
6162
import java.util.HashMap;
6263
import java.util.Locale;
6364
import java.util.Map;
65+
import java.util.Optional;
6466
import java.util.Set;
6567

6668
/**
@@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
9092

9193
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
9294

93-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
95+
/**
96+
* An interface that implementors use when a class requires a client to maybe have a feature.
97+
*/
98+
public interface FeatureAware {
99+
100+
/**
101+
* An optional feature that is required for the client to have.
102+
*
103+
* @return an empty optional if no feature is required otherwise a string representing the required feature
104+
*/
105+
default Optional<String> getRequiredFeature() {
106+
return Optional.empty();
107+
}
108+
109+
/**
110+
* Tests whether or not the custom should be serialized. The criteria are:
111+
* <ul>
112+
* <li>the output stream must be at least the minimum supported version of the custom</li>
113+
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
114+
* </ul>
115+
* <p>
116+
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
117+
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
118+
* for connected nodes we always require that the node has the required feature.
119+
*
120+
* @param out the output stream
121+
* @param custom the custom to serialize
122+
* @param <T> the type of the custom
123+
* @return true if the custom should be serialized and false otherwise
124+
*/
125+
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
126+
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
127+
return false;
128+
}
129+
if (custom.getRequiredFeature().isPresent()) {
130+
final String requiredFeature = custom.getRequiredFeature().get();
131+
// if it is a transport client we are lenient yet for a connected node it must have the required feature
132+
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
133+
}
134+
return true;
135+
}
136+
137+
}
138+
139+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
94140

95141
/**
96142
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
@@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
99145
default boolean isPrivate() {
100146
return false;
101147
}
148+
102149
}
103150

104151
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
@@ -244,6 +291,15 @@ public String toString() {
244291
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
245292
}
246293
}
294+
if (metaData.customs().isEmpty() == false) {
295+
sb.append("metadata customs:\n");
296+
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
297+
final String type = cursor.key;
298+
final MetaData.Custom custom = cursor.value;
299+
sb.append(TAB).append(type).append(": ").append(custom);
300+
}
301+
sb.append("\n");
302+
}
247303
sb.append(blocks());
248304
sb.append(nodes());
249305
sb.append(routingTable());
@@ -691,14 +747,14 @@ public void writeTo(StreamOutput out) throws IOException {
691747
blocks.writeTo(out);
692748
// filter out custom states not supported by the other node
693749
int numberOfCustoms = 0;
694-
for (ObjectCursor<Custom> cursor : customs.values()) {
695-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
750+
for (final ObjectCursor<Custom> cursor : customs.values()) {
751+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
696752
numberOfCustoms++;
697753
}
698754
}
699755
out.writeVInt(numberOfCustoms);
700-
for (ObjectCursor<Custom> cursor : customs.values()) {
701-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
756+
for (final ObjectCursor<Custom> cursor : customs.values()) {
757+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
702758
out.writeNamedWriteable(cursor.value);
703759
}
704760
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2525
import org.apache.logging.log4j.Logger;
2626
import org.apache.lucene.util.CollectionUtil;
27+
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.ClusterState.FeatureAware;
2729
import org.elasticsearch.cluster.Diff;
2830
import org.elasticsearch.cluster.Diffable;
2931
import org.elasticsearch.cluster.DiffableUtils;
@@ -117,9 +119,10 @@ public enum XContentContext {
117119
*/
118120
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
119121

120-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
122+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
121123

122124
EnumSet<XContentContext> context();
125+
123126
}
124127

125128
public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
@@ -782,14 +785,14 @@ public void writeTo(StreamOutput out) throws IOException {
782785
}
783786
// filter out custom states not supported by the other node
784787
int numberOfCustoms = 0;
785-
for (ObjectCursor<Custom> cursor : customs.values()) {
786-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
788+
for (final ObjectCursor<Custom> cursor : customs.values()) {
789+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
787790
numberOfCustoms++;
788791
}
789792
}
790793
out.writeVInt(numberOfCustoms);
791-
for (ObjectCursor<Custom> cursor : customs.values()) {
792-
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
794+
for (final ObjectCursor<Custom> cursor : customs.values()) {
795+
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
793796
out.writeNamedWriteable(cursor.value);
794797
}
795798
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.lucene.util.BytesRefBuilder;
3131
import org.elasticsearch.ElasticsearchException;
3232
import org.elasticsearch.Version;
33+
import org.elasticsearch.cluster.ClusterState;
34+
import org.elasticsearch.cluster.metadata.MetaData;
3335
import org.elasticsearch.common.Nullable;
3436
import org.elasticsearch.common.bytes.BytesReference;
3537
import org.elasticsearch.common.geo.GeoPoint;
@@ -58,10 +60,12 @@
5860
import java.util.EnumMap;
5961
import java.util.EnumSet;
6062
import java.util.HashMap;
63+
import java.util.HashSet;
6164
import java.util.Iterator;
6265
import java.util.LinkedHashMap;
6366
import java.util.List;
6467
import java.util.Map;
68+
import java.util.Set;
6569
import java.util.concurrent.TimeUnit;
6670
import java.util.function.IntFunction;
6771

@@ -98,6 +102,7 @@ public abstract class StreamOutput extends OutputStream {
98102
}
99103

100104
private Version version = Version.CURRENT;
105+
private Set<String> features = Collections.emptySet();
101106

102107
/**
103108
* The version of the node on the other side of this stream.
@@ -113,6 +118,27 @@ public void setVersion(Version version) {
113118
this.version = version;
114119
}
115120

121+
/**
122+
* Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
123+
* {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
124+
*
125+
* @param feature the feature to test
126+
* @return true if the stream has the specified feature
127+
*/
128+
public boolean hasFeature(final String feature) {
129+
return this.features.contains(feature);
130+
}
131+
132+
/**
133+
* Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
134+
*
135+
* @param features the features on the stream
136+
*/
137+
public void setFeatures(final Set<String> features) {
138+
assert this.features.isEmpty() : this.features;
139+
this.features = Collections.unmodifiableSet(new HashSet<>(features));
140+
}
141+
116142
public long position() throws IOException {
117143
throw new UnsupportedOperationException();
118144
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ public void apply(Settings value, Settings current, Settings previous) {
379379
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
380380
EsExecutors.PROCESSORS_SETTING,
381381
ThreadContext.DEFAULT_HEADERS_SETTING,
382+
TcpTransport.DEFAULT_FEATURES_SETTING,
382383
Loggers.LOG_DEFAULT_LEVEL_SETTING,
383384
Loggers.LOG_LEVEL_SETTING,
384385
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Objects;
52+
import java.util.Optional;
5253
import java.util.Set;
5354
import java.util.function.Predicate;
5455
import java.util.stream.Collectors;

server/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.bootstrap.BootstrapCheck;
2424
import org.elasticsearch.client.Client;
2525
import org.elasticsearch.cluster.ClusterModule;
26+
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.metadata.IndexMetaData;
2728
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
2829
import org.elasticsearch.cluster.metadata.MetaData;
@@ -56,6 +57,7 @@
5657
import java.util.Collections;
5758
import java.util.List;
5859
import java.util.Map;
60+
import java.util.Optional;
5961
import java.util.function.UnaryOperator;
6062

6163
/**
@@ -79,6 +81,17 @@
7981
*/
8082
public abstract class Plugin implements Closeable {
8183

84+
/**
85+
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
86+
* also {@link ClusterState.FeatureAware}.
87+
*
88+
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
89+
* customs
90+
*/
91+
protected Optional<String> getFeature() {
92+
return Optional.empty();
93+
}
94+
8295
/**
8396
* Node level guice modules.
8497
*/

server/src/main/java/org/elasticsearch/plugins/PluginsService.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
import org.elasticsearch.common.settings.Setting;
4242
import org.elasticsearch.common.settings.Setting.Property;
4343
import org.elasticsearch.common.settings.Settings;
44+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4445
import org.elasticsearch.index.IndexModule;
4546
import org.elasticsearch.threadpool.ExecutorBuilder;
47+
import org.elasticsearch.transport.TcpTransport;
4648

4749
import java.io.IOException;
4850
import java.lang.reflect.Constructor;
@@ -57,16 +59,17 @@
5759
import java.util.HashMap;
5860
import java.util.HashSet;
5961
import java.util.Iterator;
60-
import java.util.LinkedHashMap;
6162
import java.util.LinkedHashSet;
6263
import java.util.List;
6364
import java.util.Locale;
6465
import java.util.Map;
6566
import java.util.Objects;
67+
import java.util.Optional;
6668
import java.util.Set;
69+
import java.util.TreeMap;
70+
import java.util.TreeSet;
6771
import java.util.function.Function;
6872
import java.util.stream.Collectors;
69-
import java.util.stream.Stream;
7073

7174
import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
7275

@@ -196,6 +199,7 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri
196199

197200
public Settings updatedSettings() {
198201
Map<String, String> foundSettings = new HashMap<>();
202+
final Map<String, String> features = new TreeMap<>();
199203
final Settings.Builder builder = Settings.builder();
200204
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
201205
Settings settings = plugin.v2().additionalSettings();
@@ -207,6 +211,23 @@ public Settings updatedSettings() {
207211
}
208212
}
209213
builder.put(settings);
214+
final Optional<String> maybeFeature = plugin.v2().getFeature();
215+
if (maybeFeature.isPresent()) {
216+
final String feature = maybeFeature.get();
217+
if (features.containsKey(feature)) {
218+
final String message = String.format(
219+
Locale.ROOT,
220+
"duplicate feature [%s] in plugin [%s], already added in [%s]",
221+
feature,
222+
plugin.v1().getName(),
223+
features.get(feature));
224+
throw new IllegalArgumentException(message);
225+
}
226+
features.put(feature, plugin.v1().getName());
227+
}
228+
}
229+
for (final String feature : features.keySet()) {
230+
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
210231
}
211232
return builder.put(this.settings).build();
212233
}

0 commit comments

Comments
 (0)