setting = Setting.prefixKeySetting(prefix, (key) -> new Setting<>(key, "", Function.identity(),
- Setting.Property.NodeScope));
- allSettings.add(setting);
- }
-
- return allSettings;
- }
-
-}
diff --git a/modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java b/modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java
deleted file mode 100644
index d32df242e487c..0000000000000
--- a/modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.tribe;
-
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Supplier;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateTaskConfig;
-import org.elasticsearch.cluster.ClusterStateTaskExecutor;
-import org.elasticsearch.cluster.MergableCustomMetaData;
-import org.elasticsearch.cluster.block.ClusterBlock;
-import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.block.ClusterBlocks;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.routing.IndexRoutingTable;
-import org.elasticsearch.cluster.routing.RoutingTable;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.component.Lifecycle;
-import org.elasticsearch.common.hash.MurmurHash3;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.logging.DeprecationLogger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.network.NetworkService;
-import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Setting.Property;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.transport.TcpTransport;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.util.Collections.unmodifiableMap;
-
-/**
- * The tribe service holds a list of node clients connected to a list of tribe members, and uses their
- * cluster state events to update this local node cluster state with the merged view of it.
- *
- * The tribe node settings make sure the discovery used is "local", but with no master elected. This means no
- * write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException}
- * will be thrown), and state level metadata operations with automatically use the local flag.
- *
- * The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged
- * will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe
- * member it came from. In case an index has already been merged from one cluster, and the same name index is discovered
- * in another cluster, the conflict one will be discarded. This happens because we need to have the correct index name
- * to propagate to the relevant cluster.
- */
-public class TribeService extends AbstractLifecycleComponent {
-
- public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false,
- false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
- public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false,
- false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
-
- // internal settings only
- public static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", Property.NodeScope);
- private final ClusterService clusterService;
- private final String[] blockIndicesWrite;
- private final String[] blockIndicesRead;
- private final String[] blockIndicesMetadata;
- private static final String ON_CONFLICT_ANY = "any", ON_CONFLICT_DROP = "drop", ON_CONFLICT_PREFER = "prefer_";
-
- public static final Setting ON_CONFLICT_SETTING = new Setting<>("tribe.on_conflict", ON_CONFLICT_ANY, (s) -> {
- switch (s) {
- case ON_CONFLICT_ANY:
- case ON_CONFLICT_DROP:
- return s;
- default:
- if (s.startsWith(ON_CONFLICT_PREFER) && s.length() > ON_CONFLICT_PREFER.length()) {
- return s;
- }
- throw new IllegalArgumentException(
- "Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: [" + s + "]");
- }
- }, Property.NodeScope);
-
- public static final Setting BLOCKS_METADATA_SETTING =
- Setting.boolSetting("tribe.blocks.metadata", false, Property.NodeScope);
- public static final Setting BLOCKS_WRITE_SETTING =
- Setting.boolSetting("tribe.blocks.write", false, Property.NodeScope);
- public static final Setting> BLOCKS_WRITE_INDICES_SETTING =
- Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), Property.NodeScope);
- public static final Setting> BLOCKS_READ_INDICES_SETTING =
- Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), Property.NodeScope);
- public static final Setting> BLOCKS_METADATA_INDICES_SETTING =
- Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), Property.NodeScope);
-
- public static final Set TRIBE_SETTING_KEYS = Sets.newHashSet(TRIBE_NAME_SETTING.getKey(), ON_CONFLICT_SETTING.getKey(),
- BLOCKS_METADATA_INDICES_SETTING.getKey(), BLOCKS_METADATA_SETTING.getKey(), BLOCKS_READ_INDICES_SETTING.getKey(),
- BLOCKS_WRITE_INDICES_SETTING.getKey(), BLOCKS_WRITE_SETTING.getKey());
-
- // these settings should be passed through to each tribe client, if they are not set explicitly
- private static final List> PASS_THROUGH_SETTINGS = Arrays.asList(
- NetworkService.GLOBAL_NETWORK_HOST_SETTING,
- NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
- NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
- TcpTransport.HOST,
- TcpTransport.BIND_HOST,
- TcpTransport.PUBLISH_HOST
- );
- private final String onConflict;
- private final Set droppedIndices = ConcurrentCollections.newConcurrentSet();
-
- private final List nodes = new CopyOnWriteArrayList<>();
-
- private final NamedWriteableRegistry namedWriteableRegistry;
-
- public TribeService(Settings settings, NodeEnvironment nodeEnvironment, ClusterService clusterService,
- NamedWriteableRegistry namedWriteableRegistry, Function clientNodeBuilder) {
- super(settings);
- this.clusterService = clusterService;
- this.namedWriteableRegistry = namedWriteableRegistry;
- Map nodesSettings = new HashMap<>(settings.getGroups("tribe", true));
- nodesSettings.remove("blocks"); // remove prefix settings that don't indicate a client
- nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client
- for (Map.Entry entry : nodesSettings.entrySet()) {
- Settings clientSettings = buildClientSettings(entry.getKey(), nodeEnvironment.nodeId(), settings, entry.getValue());
- try {
- nodes.add(clientNodeBuilder.apply(clientSettings));
- } catch (Exception e) {
- // calling close is safe for non started nodes, we can just iterate over all
- for (Node otherNode : nodes) {
- try {
- otherNode.close();
- } catch (Exception inner) {
- inner.addSuppressed(e);
- logger.warn((Supplier>) () -> new ParameterizedMessage("failed to close node {} on failed start", otherNode), inner);
- }
- }
- throw ExceptionsHelper.convertToRuntime(e);
- }
- }
-
- this.blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
- this.blockIndicesRead = BLOCKS_READ_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
- this.blockIndicesWrite = BLOCKS_WRITE_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
- if (!nodes.isEmpty()) {
- new DeprecationLogger(Loggers.getLogger(TribeService.class))
- .deprecated("tribe nodes are deprecated in favor of cross-cluster search and will be removed in Elasticsearch 7.0.0");
- }
- this.onConflict = ON_CONFLICT_SETTING.get(settings);
- }
-
- // pkg private for testing
- /**
- * Builds node settings for a tribe client node from the tribe node's global settings,
- * combined with tribe specific settings.
- */
- static Settings buildClientSettings(String tribeName, String parentNodeId, Settings globalSettings, Settings tribeSettings) {
- for (String tribeKey : tribeSettings.keySet()) {
- if (tribeKey.startsWith("path.")) {
- throw new IllegalArgumentException("Setting [" + tribeKey + "] not allowed in tribe client [" + tribeName + "]");
- }
- }
- Settings.Builder sb = Settings.builder().put(tribeSettings);
- sb.put(Node.NODE_NAME_SETTING.getKey(), Node.NODE_NAME_SETTING.get(globalSettings) + "/" + tribeName);
- sb.put(Environment.PATH_HOME_SETTING.getKey(), Environment.PATH_HOME_SETTING.get(globalSettings)); // pass through ES home dir
- if (Environment.PATH_LOGS_SETTING.exists(globalSettings)) {
- sb.put(Environment.PATH_LOGS_SETTING.getKey(), Environment.PATH_LOGS_SETTING.get(globalSettings));
- }
- for (Setting> passthrough : PASS_THROUGH_SETTINGS) {
- if (passthrough.exists(tribeSettings) == false && passthrough.exists(globalSettings)) {
- sb.put(passthrough.getKey(), globalSettings.get(passthrough.getKey()));
- }
- }
- sb.put(TRIBE_NAME_SETTING.getKey(), tribeName);
- if (sb.get(NetworkModule.HTTP_ENABLED.getKey()) == null) {
- sb.put(NetworkModule.HTTP_ENABLED.getKey(), false);
- }
- sb.put(Node.NODE_DATA_SETTING.getKey(), false);
- sb.put(Node.NODE_MASTER_SETTING.getKey(), false);
- sb.put(Node.NODE_INGEST_SETTING.getKey(), false);
-
- // node id of a tribe client node is determined by node id of parent node and tribe name
- final BytesRef seedAsString = new BytesRef(parentNodeId + "/" + tribeName);
- long nodeIdSeed = MurmurHash3.hash128(seedAsString.bytes, seedAsString.offset, seedAsString.length, 0, new MurmurHash3.Hash128()).h1;
- sb.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), nodeIdSeed);
- sb.put(Node.NODE_LOCAL_STORAGE_SETTING.getKey(), false);
- return sb.build();
- }
-
- @Override
- protected void doStart() {
-
- }
-
- public void startNodes() {
- for (Node node : nodes) {
- try {
- getClusterService(node).addListener(new TribeClusterStateListener(node));
- node.start();
- } catch (Exception e) {
- // calling close is safe for non started nodes, we can just iterate over all
- for (Node otherNode : nodes) {
- try {
- otherNode.close();
- } catch (Exception inner) {
- inner.addSuppressed(e);
- logger.warn((Supplier>) () -> new ParameterizedMessage("failed to close node {} on failed start", otherNode), inner);
- }
- }
- throw ExceptionsHelper.convertToRuntime(e);
- }
- }
- }
-
- @Override
- protected void doStop() {
- doClose();
- }
-
- @Override
- protected void doClose() {
- for (Node node : nodes) {
- try {
- node.close();
- } catch (Exception e) {
- logger.warn((Supplier>) () -> new ParameterizedMessage("failed to close node {}", node), e);
- }
- }
- }
-
-
- class TribeClusterStateListener implements ClusterStateListener {
- private final String tribeName;
- private final TribeNodeClusterStateTaskExecutor executor;
-
- TribeClusterStateListener(Node tribeNode) {
- String tribeName = TRIBE_NAME_SETTING.get(tribeNode.settings());
- this.tribeName = tribeName;
- executor = new TribeNodeClusterStateTaskExecutor(tribeName);
- }
-
- @Override
- public void clusterChanged(final ClusterChangedEvent event) {
- logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
- clusterService.submitStateUpdateTask(
- "cluster event from " + tribeName,
- event,
- ClusterStateTaskConfig.build(Priority.NORMAL),
- executor,
- (source, e) -> logger.warn((Supplier>) () -> new ParameterizedMessage("failed to process [{}]", source), e));
- }
- }
-
- class TribeNodeClusterStateTaskExecutor implements ClusterStateTaskExecutor {
- private final String tribeName;
-
- TribeNodeClusterStateTaskExecutor(String tribeName) {
- this.tribeName = tribeName;
- }
-
- @Override
- public String describeTasks(List tasks) {
- return tasks.stream().map(ClusterChangedEvent::source).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
- }
-
- @Override
- public boolean runOnlyOnMaster() {
- return false;
- }
-
- @Override
- public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception {
- ClusterTasksResult.Builder builder = ClusterTasksResult.builder();
- ClusterState.Builder newState = ClusterState.builder(currentState);
- boolean clusterStateChanged = updateNodes(currentState, tasks, newState);
- clusterStateChanged |= updateIndicesAndMetaData(currentState, tasks, newState);
- builder.successes(tasks);
- return builder.build(clusterStateChanged ? newState.build() : currentState);
- }
-
- private boolean updateNodes(ClusterState currentState, List tasks, ClusterState.Builder newState) {
- boolean clusterStateChanged = false;
- // we only need to apply the latest cluster state update
- ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1);
- ClusterState tribeState = latestTask.state();
- DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
- // -- merge nodes
- // go over existing nodes, and see if they need to be removed
- for (DiscoveryNode discoNode : currentState.nodes()) {
- String markedTribeName = discoNode.getAttributes().get(TRIBE_NAME_SETTING.getKey());
- if (markedTribeName != null && markedTribeName.equals(tribeName)) {
- if (tribeState.nodes().get(discoNode.getId()) == null) {
- clusterStateChanged = true;
- logger.info("[{}] removing node [{}]", tribeName, discoNode);
- nodes.remove(discoNode.getId());
- }
- }
- }
- // go over tribe nodes, and see if they need to be added
- for (DiscoveryNode tribe : tribeState.nodes()) {
- if (currentState.nodes().nodeExists(tribe) == false) {
- // a new node, add it, but also add the tribe name to the attributes
- Map tribeAttr = new HashMap<>(tribe.getAttributes());
- tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
- DiscoveryNode discoNode = new DiscoveryNode(tribe.getName(), tribe.getId(), tribe.getEphemeralId(),
- tribe.getHostName(), tribe.getHostAddress(), tribe.getAddress(), unmodifiableMap(tribeAttr), tribe.getRoles(),
- tribe.getVersion());
- clusterStateChanged = true;
- logger.info("[{}] adding node [{}]", tribeName, discoNode);
- nodes.remove(tribe.getId()); // remove any existing node with the same id but different ephemeral id
- nodes.add(discoNode);
- }
- }
- if (clusterStateChanged) {
- newState.nodes(nodes);
- }
- return clusterStateChanged;
- }
-
- private boolean updateIndicesAndMetaData(ClusterState currentState, List tasks, ClusterState.Builder newState) {
- // we only need to apply the latest cluster state update
- ClusterChangedEvent latestTask = tasks.get(tasks.size() - 1);
- ClusterState tribeState = latestTask.state();
- boolean clusterStateChanged = false;
- ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
- MetaData.Builder metaData = MetaData.builder(currentState.metaData());
- RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
- // go over existing indices, and see if they need to be removed
- for (IndexMetaData index : currentState.metaData()) {
- String markedTribeName = TRIBE_NAME_SETTING.get(index.getSettings());
- if (markedTribeName != null && markedTribeName.equals(tribeName)) {
- IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex());
- clusterStateChanged = true;
- if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) {
- logger.info("[{}] removing index {}", tribeName, index.getIndex());
- removeIndex(blocks, metaData, routingTable, index);
- } else {
- // always make sure to update the metadata and routing table, in case
- // there are changes in them (new mapping, shards moving from initializing to started)
- routingTable.add(tribeState.routingTable().index(index.getIndex()));
- Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings())
- .put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
- metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
- }
- }
- }
- // go over tribe one, and see if they need to be added
- for (IndexMetaData tribeIndex : tribeState.metaData()) {
- // if there is no routing table yet, do nothing with it...
- IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex());
- if (table == null) {
- continue;
- }
- //NOTE: we have to use the index name here since UUID are different even if the name is the same
- final String indexName = tribeIndex.getIndex().getName();
- final IndexMetaData indexMetaData = currentState.metaData().index(indexName);
- if (indexMetaData == null) {
- if (!droppedIndices.contains(indexName)) {
- // a new index, add it, and add the tribe name as a setting
- clusterStateChanged = true;
- logger.info("[{}] adding index {}", tribeName, tribeIndex.getIndex());
- addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
- }
- } else {
- String existingFromTribe = TRIBE_NAME_SETTING.get(indexMetaData.getSettings());
- if (!tribeName.equals(existingFromTribe)) {
- // we have a potential conflict on index names, decide what to do...
- if (ON_CONFLICT_ANY.equals(onConflict)) {
- // we chose any tribe, carry on
- } else if (ON_CONFLICT_DROP.equals(onConflict)) {
- // drop the indices, there is a conflict
- clusterStateChanged = true;
- logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(),
- existingFromTribe);
- removeIndex(blocks, metaData, routingTable, tribeIndex);
- droppedIndices.add(indexName);
- } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
- // on conflict, prefer a tribe...
- String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
- if (tribeName.equals(preferredTribeName)) {
- // the new one is hte preferred one, replace...
- clusterStateChanged = true;
- logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(),
- existingFromTribe);
- removeIndex(blocks, metaData, routingTable, tribeIndex);
- addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
- } // else: either the existing one is the preferred one, or we haven't seen one, carry on
- }
- }
- }
- }
- clusterStateChanged |= updateCustoms(currentState, tasks, metaData);
- if (clusterStateChanged) {
- newState.blocks(blocks);
- newState.metaData(metaData);
- newState.routingTable(routingTable.build());
- }
- return clusterStateChanged;
- }
-
- private boolean updateCustoms(ClusterState currentState, List tasks, MetaData.Builder metaData) {
- boolean clusterStateChanged = false;
- Set changedCustomMetaDataTypeSet = tasks.stream()
- .map(ClusterChangedEvent::changedCustomMetaDataSet)
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- final List tribeClientNodes = TribeService.this.nodes;
- Map mergedCustomMetaDataMap = mergeChangedCustomMetaData(changedCustomMetaDataTypeSet,
- customMetaDataType -> tribeClientNodes.stream()
- .map(TribeService::getClusterService)
- // cluster service might not have initial state yet (as tribeClientNodes are started after main node)
- .filter(cs -> cs.lifecycleState() == Lifecycle.State.STARTED)
- .map(ClusterService::state)
- .map(ClusterState::metaData)
- .map(clusterMetaData -> ((MetaData.Custom) clusterMetaData.custom(customMetaDataType)))
- .filter(custom1 -> custom1 != null && custom1 instanceof MergableCustomMetaData)
- .map(custom2 -> (MergableCustomMetaData) marshal(custom2))
- .collect(Collectors.toList())
- );
- for (String changedCustomMetaDataType : changedCustomMetaDataTypeSet) {
- MetaData.Custom mergedCustomMetaData = mergedCustomMetaDataMap.get(changedCustomMetaDataType);
- if (mergedCustomMetaData == null) {
- // we ignore merging custom md which doesn't implement MergableCustomMetaData interface
- if (currentState.metaData().custom(changedCustomMetaDataType) instanceof MergableCustomMetaData) {
- // custom md has been removed
- clusterStateChanged = true;
- logger.info("[{}] removing custom meta data type [{}]", tribeName, changedCustomMetaDataType);
- metaData.removeCustom(changedCustomMetaDataType);
- }
- } else {
- // custom md has been changed
- clusterStateChanged = true;
- logger.info("[{}] updating custom meta data type [{}] data [{}]", tribeName, changedCustomMetaDataType, mergedCustomMetaData);
- metaData.putCustom(changedCustomMetaDataType, mergedCustomMetaData);
- }
- }
- return clusterStateChanged;
- }
-
- private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable,
- IndexMetaData index) {
- metaData.remove(index.getIndex().getName());
- routingTable.remove(index.getIndex().getName());
- blocks.removeIndexBlocks(index.getIndex().getName());
- }
-
- private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData,
- RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
- Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
- metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
- routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
- if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex().getName())) {
- blocks.addIndexBlock(tribeIndex.getIndex().getName(), IndexMetaData.INDEX_METADATA_BLOCK);
- }
- if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex().getName())) {
- blocks.addIndexBlock(tribeIndex.getIndex().getName(), IndexMetaData.INDEX_READ_BLOCK);
- }
- if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex().getName())) {
- blocks.addIndexBlock(tribeIndex.getIndex().getName(), IndexMetaData.INDEX_WRITE_BLOCK);
- }
- }
- }
-
- private static ClusterService getClusterService(Node node) {
- return node.injector().getInstance(ClusterService.class);
- }
-
- // pkg-private for testing
- static Map mergeChangedCustomMetaData(Set changedCustomMetaDataTypeSet,
- Function> customMetaDataByTribeNode) {
-
- Map changedCustomMetaDataMap = new HashMap<>(changedCustomMetaDataTypeSet.size());
- for (String customMetaDataType : changedCustomMetaDataTypeSet) {
- customMetaDataByTribeNode.apply(customMetaDataType).stream()
- .reduce((mergableCustomMD, mergableCustomMD2) ->
- ((MergableCustomMetaData) mergableCustomMD.merge((MetaData.Custom) mergableCustomMD2)))
- .ifPresent(mergedCustomMetaData ->
- changedCustomMetaDataMap.put(customMetaDataType, ((MetaData.Custom) mergedCustomMetaData)));
- }
- return changedCustomMetaDataMap;
- }
-
- /**
- * Since custom metadata can be loaded by a plugin class loader that resides in a sub-node, we need to
- * marshal this object into something the tribe node can work with
- */
- private MetaData.Custom marshal(MetaData.Custom custom) {
- try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()){
- bytesStreamOutput.writeNamedWriteable(custom);
- try(StreamInput input = bytesStreamOutput.bytes().streamInput()) {
- StreamInput namedInput = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry);
- MetaData.Custom marshaled = namedInput.readNamedWriteable(MetaData.Custom.class);
- return marshaled;
- }
- } catch (IOException ex) {
- throw new IllegalStateException("cannot marshal object with type " + custom.getWriteableName() + " to tribe node");
- }
- }
-}
diff --git a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java
deleted file mode 100644
index 9957ad6bae9ee..0000000000000
--- a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.tribe;
-
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.support.DestructiveOperations;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
-import org.elasticsearch.cluster.NamedDiff;
-import org.elasticsearch.cluster.block.ClusterBlockException;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.UUIDs;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.discovery.DiscoverySettings;
-import org.elasticsearch.discovery.MasterNotDiscoveredException;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.NodeConfigurationSource;
-import org.elasticsearch.test.TestCustomMetaData;
-import org.elasticsearch.test.discovery.TestZenDiscovery;
-import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1;
-import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData2;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import static java.util.stream.Collectors.toSet;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.core.Is.is;
-
-/**
- * Note, when talking to tribe client, no need to set the local flag on master read operations, it
- * does it by default.
- */
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
-public class TribeIntegrationTests extends ESIntegTestCase {
-
- private static final String TRIBE_NODE = "tribe_node";
-
- private static InternalTestCluster cluster1;
- private static InternalTestCluster cluster2;
-
- /**
- * A predicate that is used to select none of the remote clusters
- **/
- private static final Predicate NONE = c -> false;
-
- /**
- * A predicate that is used to select the remote cluster 1 only
- **/
- private static final Predicate CLUSTER1_ONLY = c -> c.getClusterName().equals(cluster1.getClusterName());
-
- /**
- * A predicate that is used to select the remote cluster 2 only
- **/
- private static final Predicate CLUSTER2_ONLY = c -> c.getClusterName().equals(cluster2.getClusterName());
-
- /**
- * A predicate that is used to select the two remote clusters
- **/
- private static final Predicate ALL = c -> true;
-
- @Override
- protected Settings nodeSettings(int nodeOrdinal) {
- return Settings.builder()
- .put(super.nodeSettings(nodeOrdinal))
- // Required to delete _all indices on remote clusters
- .put(DestructiveOperations.REQUIRES_NAME_SETTING.getKey(), false)
- .build();
- }
-
- public static class TestCustomMetaDataPlugin extends Plugin {
-
- private final List namedWritables = new ArrayList<>();
-
- public TestCustomMetaDataPlugin() {
- registerBuiltinWritables();
- }
-
- private void registerMetaDataCustom(String name, Writeable.Reader extends T> reader,
- Writeable.Reader diffReader) {
- namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, name, reader));
- namedWritables.add(new NamedWriteableRegistry.Entry(NamedDiff.class, name, diffReader));
- }
-
- private void registerBuiltinWritables() {
- registerMetaDataCustom(MergableCustomMetaData1.TYPE, MergableCustomMetaData1::readFrom, MergableCustomMetaData1::readDiffFrom);
- registerMetaDataCustom(MergableCustomMetaData2.TYPE, MergableCustomMetaData2::readFrom, MergableCustomMetaData2::readDiffFrom);
- }
-
- @Override
- public List getNamedWriteables() {
- return namedWritables;
- }
- }
-
- public static class MockTribePlugin extends TribePlugin {
-
- public MockTribePlugin(Settings settings) {
- super(settings);
- }
-
- protected Function nodeBuilder(Path configPath) {
- return settings -> new MockNode(new Environment(settings, configPath), internalCluster().getPlugins());
- }
-
- }
-
- @Override
- protected Collection> nodePlugins() {
- ArrayList> plugins = new ArrayList<>();
- plugins.addAll(getMockPlugins());
- plugins.add(MockTribePlugin.class);
- plugins.add(TribeAwareTestZenDiscoveryPlugin.class);
- plugins.add(TestCustomMetaDataPlugin.class);
- return plugins;
- }
-
- @Override
- protected boolean addTestZenDiscovery() {
- return false;
- }
-
- public static class TribeAwareTestZenDiscoveryPlugin extends TestZenDiscovery.TestPlugin {
-
- public TribeAwareTestZenDiscoveryPlugin(Settings settings) {
- super(settings);
- }
-
- @Override
- public Settings additionalSettings() {
- if (settings.getGroups("tribe", true).isEmpty()) {
- return super.additionalSettings();
- } else {
- return Settings.EMPTY;
- }
- }
- }
-
- @Before
- public void startRemoteClusters() {
- final int minNumDataNodes = 2;
- final int maxNumDataNodes = 4;
- final NodeConfigurationSource nodeConfigurationSource = getNodeConfigSource();
- final Collection> plugins = nodePlugins();
-
- if (cluster1 == null) {
- cluster1 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
- UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_1",
- plugins, Function.identity());
- }
-
- if (cluster2 == null) {
- cluster2 = new InternalTestCluster(randomLong(), createTempDir(), true, true, minNumDataNodes, maxNumDataNodes,
- UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "cluster_2",
- plugins, Function.identity());
- }
-
- doWithAllClusters(c -> {
- try {
- c.beforeTest(random(), 0.1);
- c.ensureAtLeastNumDataNodes(minNumDataNodes);
- } catch (Exception e) {
- throw new RuntimeException("Failed to set up remote cluster [" + c.getClusterName() + "]", e);
- }
- });
- }
-
- @After
- public void wipeRemoteClusters() {
- doWithAllClusters(c -> {
- final String clusterName = c.getClusterName();
- try {
- c.client().admin().indices().prepareDelete(MetaData.ALL).get();
- c.afterTest();
- } catch (IOException e) {
- throw new RuntimeException("Failed to clean up remote cluster [" + clusterName + "]", e);
- }
- });
- }
-
- @AfterClass
- public static void stopRemoteClusters() {
- try {
- doWithAllClusters(InternalTestCluster::close);
- } finally {
- cluster1 = null;
- cluster2 = null;
- }
- }
-
- private Releasable startTribeNode() throws Exception {
- return startTribeNode(ALL, Settings.EMPTY);
- }
-
- private Releasable startTribeNode(Predicate filter, Settings settings) throws Exception {
- final String node = internalCluster().startNode(createTribeSettings(filter).put(settings).build());
-
- // wait for node to be connected to all tribe clusters
- final Set expectedNodes = Sets.newHashSet(internalCluster().getNodeNames());
- doWithAllClusters(filter, c -> {
- // Adds the tribe client node dedicated to this remote cluster
- for (String tribeNode : internalCluster().getNodeNames()) {
- expectedNodes.add(tribeNode + "/" + c.getClusterName());
- }
- // Adds the remote clusters nodes names
- Collections.addAll(expectedNodes, c.getNodeNames());
- });
- assertBusy(() -> {
- ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState();
- Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet());
- assertThat(nodes, containsInAnyOrder(expectedNodes.toArray()));
- });
- // wait for join to be fully applied on all nodes in the tribe clusters, see https://github.com/elastic/elasticsearch/issues/23695
- doWithAllClusters(filter, c -> {
- assertFalse(c.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut());
- });
-
- return () -> {
- try {
- while(internalCluster().getNodeNames().length > 0) {
- internalCluster().stopRandomNode(s -> true);
- }
- } catch (Exception e) {
- throw new RuntimeException("Failed to close tribe node [" + node + "]", e);
- }
- };
- }
-
- private Settings.Builder createTribeSettings(Predicate filter) {
- assertNotNull(filter);
-
- final Settings.Builder settings = Settings.builder();
- settings.put(Node.NODE_NAME_SETTING.getKey(), TRIBE_NODE);
- settings.put(Node.NODE_DATA_SETTING.getKey(), false);
- settings.put(Node.NODE_MASTER_SETTING.getKey(), false);
- settings.put(Node.NODE_INGEST_SETTING.getKey(), false);
- settings.put(NetworkModule.HTTP_ENABLED.getKey(), false);
- settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), getTestTransportType());
- // add dummy tribe setting so that node is always identifiable as tribe in this test even if the set of connecting cluster is empty
- settings.put(TribeService.BLOCKS_WRITE_SETTING.getKey(), TribeService.BLOCKS_WRITE_SETTING.getDefault(Settings.EMPTY));
-
- doWithAllClusters(filter, c -> {
- String tribeSetting = "tribe." + c.getClusterName() + ".";
- settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), c.getClusterName());
- settings.put(tribeSetting + DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "100ms");
- settings.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), getTestTransportType());
- });
-
- return settings;
- }
-
- public void testTribeNodeWithBadSettings() throws Exception {
- Settings brokenSettings = Settings.builder()
- .put("tribe.some.setting.that.does.not.exist", true)
- .build();
-
- IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> startTribeNode(ALL, brokenSettings));
- assertThat(e.getMessage(), containsString("unknown setting [setting.that.does.not.exist]"));
- }
-
- public void testGlobalReadWriteBlocks() throws Exception {
- Settings additionalSettings = Settings.builder()
- .put("tribe.blocks.write", true)
- .put("tribe.blocks.metadata", true)
- .build();
-
- try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) {
- // Creates 2 indices, test1 on cluster1 and test2 on cluster2
- assertAcked(cluster1.client().admin().indices().prepareCreate("test1"));
- ensureGreen(cluster1.client());
-
- assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
- ensureGreen(cluster2.client());
-
- // Wait for the tribe node to retrieve the indices into its cluster state
- assertIndicesExist(client(), "test1", "test2");
-
- // Writes not allowed through the tribe node
- ClusterBlockException e = expectThrows(ClusterBlockException.class, () -> {
- client().prepareIndex("test1", "type1").setSource("field", "value").get();
- });
- assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/11/tribe node, write not allowed]"));
-
- e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("test2", "type2").setSource("field", "value").get());
- assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/11/tribe node, write not allowed]"));
-
- e = expectThrows(ClusterBlockException.class, () -> client().admin().indices().prepareForceMerge("test1").get());
- assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/10/tribe node, metadata not allowed]"));
-
- e = expectThrows(ClusterBlockException.class, () -> client().admin().indices().prepareForceMerge("test2").get());
- assertThat(e.getMessage(), containsString("blocked by: [BAD_REQUEST/10/tribe node, metadata not allowed]"));
- }
- }
-
- public void testIndexWriteBlocks() throws Exception {
- Settings additionalSettings = Settings.builder()
- .put("tribe.blocks.write.indices", "block_*")
- .build();
-
- try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) {
- // Creates 2 indices on each remote cluster, test1 and block_test1 on cluster1 and test2 and block_test2 on cluster2
- assertAcked(cluster1.client().admin().indices().prepareCreate("test1"));
- assertAcked(cluster1.client().admin().indices().prepareCreate("block_test1"));
- ensureGreen(cluster1.client());
-
- assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
- assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2"));
- ensureGreen(cluster2.client());
-
- // Wait for the tribe node to retrieve the indices into its cluster state
- assertIndicesExist(client(), "test1", "test2", "block_test1", "block_test2");
-
- // Writes allowed through the tribe node for test1/test2 indices
- client().prepareIndex("test1", "type1").setSource("field", "value").get();
- client().prepareIndex("test2", "type2").setSource("field", "value").get();
-
- ClusterBlockException e;
- e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("block_test1", "type1").setSource("foo", 0).get());
- assertThat(e.getMessage(), containsString("blocked by: [FORBIDDEN/8/index write (api)]"));
-
- e = expectThrows(ClusterBlockException.class, () -> client().prepareIndex("block_test2", "type2").setSource("foo", 0).get());
- assertThat(e.getMessage(), containsString("blocked by: [FORBIDDEN/8/index write (api)]"));
- }
- }
-
- public void testOnConflictDrop() throws Exception {
- Settings additionalSettings = Settings.builder()
- .put("tribe.on_conflict", "drop")
- .build();
-
- try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) {
- // Creates 2 indices on each remote cluster, test1 and conflict on cluster1 and test2 and also conflict on cluster2
- assertAcked(cluster1.client().admin().indices().prepareCreate("test1"));
- assertAcked(cluster1.client().admin().indices().prepareCreate("conflict"));
- ensureGreen(cluster1.client());
-
- assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
- assertAcked(cluster2.client().admin().indices().prepareCreate("conflict"));
- ensureGreen(cluster2.client());
-
- // Wait for the tribe node to retrieve the indices into its cluster state
- assertIndicesExist(client(), "test1", "test2");
-
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- assertThat(clusterState.getMetaData().hasIndex("test1"), is(true));
- assertThat(clusterState.getMetaData().index("test1").getSettings().get("tribe.name"), equalTo(cluster1.getClusterName()));
- assertThat(clusterState.getMetaData().hasIndex("test2"), is(true));
- assertThat(clusterState.getMetaData().index("test2").getSettings().get("tribe.name"), equalTo(cluster2.getClusterName()));
- assertThat(clusterState.getMetaData().hasIndex("conflict"), is(false));
- }
- }
-
- public void testOnConflictPrefer() throws Exception {
- final String preference = randomFrom(cluster1, cluster2).getClusterName();
- Settings additionalSettings = Settings.builder()
- .put("tribe.on_conflict", "prefer_" + preference)
- .build();
-
- try (Releasable tribeNode = startTribeNode(ALL, additionalSettings)) {
- assertAcked(cluster1.client().admin().indices().prepareCreate("test1"));
- assertAcked(cluster1.client().admin().indices().prepareCreate("shared"));
- ensureGreen(cluster1.client());
-
- assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
- assertAcked(cluster2.client().admin().indices().prepareCreate("shared"));
- ensureGreen(cluster2.client());
-
- // Wait for the tribe node to retrieve the indices into its cluster state
- assertIndicesExist(client(), "test1", "test2", "shared");
-
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- assertThat(clusterState.getMetaData().hasIndex("test1"), is(true));
- assertThat(clusterState.getMetaData().index("test1").getSettings().get("tribe.name"), equalTo(cluster1.getClusterName()));
- assertThat(clusterState.getMetaData().hasIndex("test2"), is(true));
- assertThat(clusterState.getMetaData().index("test2").getSettings().get("tribe.name"), equalTo(cluster2.getClusterName()));
- assertThat(clusterState.getMetaData().hasIndex("shared"), is(true));
- assertThat(clusterState.getMetaData().index("shared").getSettings().get("tribe.name"), equalTo(preference));
- }
- }
-
- public void testTribeOnOneCluster() throws Exception {
- try (Releasable tribeNode = startTribeNode()) {
- // Creates 2 indices, test1 on cluster1 and test2 on cluster2
- assertAcked(cluster1.client().admin().indices().prepareCreate("test1"));
- ensureGreen(cluster1.client());
-
- assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
- ensureGreen(cluster2.client());
-
- // Wait for the tribe node to retrieve the indices into its cluster state
- assertIndicesExist(client(), "test1", "test2");
-
- // Creates two docs using the tribe node
- indexRandom(true,
- client().prepareIndex("test1", "type1", "1").setSource("field1", "value1"),
- client().prepareIndex("test2", "type1", "1").setSource("field1", "value1")
- );
-
- // Verify that documents are searchable using the tribe node
- assertHitCount(client().prepareSearch().get(), 2L);
-
- // Using assertBusy to check that the mappings are in the tribe node cluster state
- assertBusy(() -> {
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- assertThat(clusterState.getMetaData().index("test1").mapping("type1"), notNullValue());
- assertThat(clusterState.getMetaData().index("test2").mapping("type1"), notNullValue());
- });
-
- // Make sure master level write operations fail... (we don't really have a master)
- expectThrows(MasterNotDiscoveredException.class, () -> {
- client().admin().indices().prepareCreate("tribe_index").setMasterNodeTimeout("10ms").get();
- });
-
- // Now delete an index and makes sure it's reflected in cluster state
- cluster2.client().admin().indices().prepareDelete("test2").get();
- assertBusy(() -> {
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- assertFalse(clusterState.getMetaData().hasIndex("test2"));
- assertFalse(clusterState.getRoutingTable().hasIndex("test2"));
- });
- }
- }
-
- public void testCloseAndOpenIndex() throws Exception {
- // Creates an index on remote cluster 1
- assertTrue(cluster1.client().admin().indices().prepareCreate("first").get().isAcknowledged());
- ensureGreen(cluster1.client());
-
- // Closes the index
- assertTrue(cluster1.client().admin().indices().prepareClose("first").get().isAcknowledged());
-
- try (Releasable tribeNode = startTribeNode()) {
- // The closed index is not part of the tribe node cluster state
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- assertFalse(clusterState.getMetaData().hasIndex("first"));
-
- // Open the index, it becomes part of the tribe node cluster state
- assertTrue(cluster1.client().admin().indices().prepareOpen("first").get().isAcknowledged());
- assertIndicesExist(client(), "first");
-
- // Create a second index, wait till it is seen from within the tribe node
- assertTrue(cluster2.client().admin().indices().prepareCreate("second").get().isAcknowledged());
- assertIndicesExist(client(), "first", "second");
- ensureGreen(cluster2.client());
-
- // Close the second index, wait till it gets removed from the tribe node cluster state
- assertTrue(cluster2.client().admin().indices().prepareClose("second").get().isAcknowledged());
- assertIndicesExist(client(), "first");
-
- // Open the second index, wait till it gets added back to the tribe node cluster state
- assertTrue(cluster2.client().admin().indices().prepareOpen("second").get().isAcknowledged());
- assertIndicesExist(client(), "first", "second");
- ensureGreen(cluster2.client());
- }
- }
-
- /**
- * Test that the tribe node's cluster state correctly reflect the number of nodes
- * of the remote clusters the tribe node is connected to.
- */
- public void testClusterStateNodes() throws Exception {
- List> predicates = Arrays.asList(NONE, CLUSTER1_ONLY, CLUSTER2_ONLY, ALL);
- Collections.shuffle(predicates, random());
-
- for (Predicate predicate : predicates) {
- try (Releasable tribeNode = startTribeNode(predicate, Settings.EMPTY)) {
- }
- }
- }
-
- public void testMergingRemovedCustomMetaData() throws Exception {
- removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
- removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
- MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a");
- MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b");
- try (Releasable tribeNode = startTribeNode()) {
- putCustomMetaData(cluster1, customMetaData1);
- putCustomMetaData(cluster2, customMetaData2);
- assertCustomMetaDataUpdated(internalCluster(), customMetaData2);
- removeCustomMetaData(cluster2, customMetaData2.getWriteableName());
- assertCustomMetaDataUpdated(internalCluster(), customMetaData1);
- }
- }
-
- public void testMergingCustomMetaData() throws Exception {
- removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
- removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
- MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1(randomAlphaOfLength(10));
- MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1(randomAlphaOfLength(10));
- List customMetaDatas = Arrays.asList(customMetaData1, customMetaData2);
- Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
- final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0);
- try (Releasable tribeNode = startTribeNode()) {
- ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
- putCustomMetaData(cluster1, customMetaData1);
- assertCustomMetaDataUpdated(internalCluster(), customMetaData1);
- // check that cluster state version is properly incremented
- assertThat(client().admin().cluster().prepareState().get().getState().getVersion(), equalTo(clusterState.getVersion() + 1));
- putCustomMetaData(cluster2, customMetaData2);
- assertCustomMetaDataUpdated(internalCluster(), tribeNodeCustomMetaData);
- }
- }
-
- public void testMergingMultipleCustomMetaData() throws Exception {
- removeCustomMetaData(cluster1, MergableCustomMetaData1.TYPE);
- removeCustomMetaData(cluster2, MergableCustomMetaData1.TYPE);
- MergableCustomMetaData1 firstCustomMetaDataType1 = new MergableCustomMetaData1(randomAlphaOfLength(10));
- MergableCustomMetaData1 secondCustomMetaDataType1 = new MergableCustomMetaData1(randomAlphaOfLength(10));
- MergableCustomMetaData2 firstCustomMetaDataType2 = new MergableCustomMetaData2(randomAlphaOfLength(10));
- MergableCustomMetaData2 secondCustomMetaDataType2 = new MergableCustomMetaData2(randomAlphaOfLength(10));
- List mergedCustomMetaDataType1 = Arrays.asList(firstCustomMetaDataType1, secondCustomMetaDataType1);
- List mergedCustomMetaDataType2 = Arrays.asList(firstCustomMetaDataType2, secondCustomMetaDataType2);
- Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
- Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
- try (Releasable tribeNode = startTribeNode()) {
- // test putting multiple custom md types propagates to tribe
- putCustomMetaData(cluster1, firstCustomMetaDataType1);
- putCustomMetaData(cluster1, firstCustomMetaDataType2);
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
-
- // test multiple same type custom md is merged and propagates to tribe
- putCustomMetaData(cluster2, secondCustomMetaDataType1);
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
- assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0));
-
- // test multiple same type custom md is merged and propagates to tribe
- putCustomMetaData(cluster2, secondCustomMetaDataType2);
- assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType1.get(0));
- assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
-
- // test removing custom md is propagates to tribe
- removeCustomMetaData(cluster2, secondCustomMetaDataType1.getWriteableName());
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
- assertCustomMetaDataUpdated(internalCluster(), mergedCustomMetaDataType2.get(0));
- removeCustomMetaData(cluster2, secondCustomMetaDataType2.getWriteableName());
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType1);
- assertCustomMetaDataUpdated(internalCluster(), firstCustomMetaDataType2);
- }
- }
-
- private static void assertCustomMetaDataUpdated(InternalTestCluster cluster,
- TestCustomMetaData expectedCustomMetaData) throws Exception {
- assertBusy(() -> {
- ClusterState tribeState = cluster.getInstance(ClusterService.class, cluster.getNodeNames()[0]).state();
- MetaData.Custom custom = tribeState.metaData().custom(expectedCustomMetaData.getWriteableName());
- assertNotNull(custom);
- assertThat(custom, equalTo(expectedCustomMetaData));
- });
- }
-
- private void removeCustomMetaData(InternalTestCluster cluster, final String customMetaDataType) {
- logger.info("removing custom_md type [{}] from [{}]", customMetaDataType, cluster.getClusterName());
- updateMetaData(cluster, builder -> builder.removeCustom(customMetaDataType));
- }
-
- private void putCustomMetaData(InternalTestCluster cluster, final TestCustomMetaData customMetaData) {
- logger.info("putting custom_md type [{}] with data[{}] from [{}]", customMetaData.getWriteableName(),
- customMetaData.getData(), cluster.getClusterName());
- updateMetaData(cluster, builder -> builder.putCustom(customMetaData.getWriteableName(), customMetaData));
- }
-
- private static void updateMetaData(InternalTestCluster cluster, UnaryOperator addCustoms) {
- ClusterService clusterService = cluster.getInstance(ClusterService.class, cluster.getMasterName());
- final CountDownLatch latch = new CountDownLatch(1);
- clusterService.submitStateUpdateTask("update customMetaData", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
- @Override
- public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- latch.countDown();
- }
-
- @Override
- public ClusterState execute(ClusterState currentState) throws Exception {
- MetaData.Builder builder = MetaData.builder(currentState.metaData());
- builder = addCustoms.apply(builder);
- return ClusterState.builder(currentState).metaData(builder).build();
- }
-
- @Override
- public void onFailure(String source, Exception e) {
- fail("failed to apply cluster state from [" + source + "] with " + e.getMessage());
- }
- });
- try {
- latch.await(1, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- fail("latch waiting on publishing custom md interrupted [" + e.getMessage() + "]");
- }
- assertThat("timed out trying to add custom metadata to " + cluster.getClusterName(), latch.getCount(), equalTo(0L));
-
- }
-
- private void assertIndicesExist(Client client, String... indices) throws Exception {
- assertBusy(() -> {
- ClusterState state = client.admin().cluster().prepareState().setRoutingTable(true).setMetaData(true).get().getState();
- assertThat(state.getMetaData().getIndices().size(), equalTo(indices.length));
- for (String index : indices) {
- assertTrue(state.getMetaData().hasIndex(index));
- assertTrue(state.getRoutingTable().hasIndex(index));
- }
- });
- }
-
- private void ensureGreen(Client client) throws Exception {
- assertBusy(() -> {
- ClusterHealthResponse clusterHealthResponse = client.admin().cluster() .prepareHealth()
- .setWaitForActiveShards(0)
- .setWaitForEvents(Priority.LANGUID)
- .setWaitForNoRelocatingShards(true)
- .get();
- assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN));
- assertFalse(clusterHealthResponse.isTimedOut());
- });
- }
-
- private static void doWithAllClusters(Consumer consumer) {
- doWithAllClusters(cluster -> cluster != null, consumer);
- }
-
- private static void doWithAllClusters(Predicate predicate, Consumer consumer) {
- Stream.of(cluster1, cluster2).filter(predicate).forEach(consumer);
- }
-}
diff --git a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
deleted file mode 100644
index 8890b20ec00e9..0000000000000
--- a/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.tribe;
-
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.MergableCustomMetaData;
-import org.elasticsearch.cluster.NamedDiff;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.TestCustomMetaData;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
-import static org.hamcrest.Matchers.instanceOf;
-
-public class TribeServiceTests extends ESTestCase {
- public void testMinimalSettings() {
- Settings globalSettings = Settings.builder()
- .put("node.name", "nodename")
- .put("path.home", "some/path").build();
- Settings clientSettings = TribeService.buildClientSettings("tribe1", "parent_id", globalSettings, Settings.EMPTY);
- assertEquals("some/path", clientSettings.get("path.home"));
- assertEquals("nodename/tribe1", clientSettings.get("node.name"));
- assertEquals("tribe1", clientSettings.get("tribe.name"));
- assertFalse(NetworkModule.HTTP_ENABLED.get(clientSettings));
- assertEquals("false", clientSettings.get("node.master"));
- assertEquals("false", clientSettings.get("node.data"));
- assertEquals("false", clientSettings.get("node.ingest"));
- assertEquals("false", clientSettings.get("node.local_storage"));
- assertEquals("3707202549613653169", clientSettings.get("node.id.seed")); // should be fixed by the parent id and tribe name
- assertEquals(9, clientSettings.size());
- }
-
- public void testEnvironmentSettings() {
- Settings globalSettings = Settings.builder()
- .put("node.name", "nodename")
- .put("path.home", "some/path")
- .put("path.logs", "logs/path").build();
- Settings clientSettings = TribeService.buildClientSettings("tribe1", "parent_id", globalSettings, Settings.EMPTY);
- assertEquals("some/path", clientSettings.get("path.home"));
- assertEquals("logs/path", clientSettings.get("path.logs"));
-
- Settings tribeSettings = Settings.builder()
- .put("path.home", "alternate/path").build();
- IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
- TribeService.buildClientSettings("tribe1", "parent_id", globalSettings, tribeSettings);
- });
- assertTrue(e.getMessage(), e.getMessage().contains("Setting [path.home] not allowed in tribe client"));
- }
-
- public void testPassthroughSettings() {
- Settings globalSettings = Settings.builder()
- .put("node.name", "nodename")
- .put("path.home", "some/path")
- .put("network.host", "0.0.0.0")
- .put("network.bind_host", "1.1.1.1")
- .put("network.publish_host", "2.2.2.2")
- .put("transport.host", "3.3.3.3")
- .put("transport.bind_host", "4.4.4.4")
- .put("transport.publish_host", "5.5.5.5").build();
- Settings clientSettings = TribeService.buildClientSettings("tribe1", "parent_id", globalSettings, Settings.EMPTY);
- assertEquals("0.0.0.0", clientSettings.get("network.host"));
- assertEquals("1.1.1.1", clientSettings.get("network.bind_host"));
- assertEquals("2.2.2.2", clientSettings.get("network.publish_host"));
- assertEquals("3.3.3.3", clientSettings.get("transport.host"));
- assertEquals("4.4.4.4", clientSettings.get("transport.bind_host"));
- assertEquals("5.5.5.5", clientSettings.get("transport.publish_host"));
-
- // per tribe client overrides still work
- Settings tribeSettings = Settings.builder()
- .put("network.host", "3.3.3.3")
- .put("network.bind_host", "4.4.4.4")
- .put("network.publish_host", "5.5.5.5")
- .put("transport.host", "6.6.6.6")
- .put("transport.bind_host", "7.7.7.7")
- .put("transport.publish_host", "8.8.8.8").build();
- clientSettings = TribeService.buildClientSettings("tribe1", "parent_id", globalSettings, tribeSettings);
- assertEquals("3.3.3.3", clientSettings.get("network.host"));
- assertEquals("4.4.4.4", clientSettings.get("network.bind_host"));
- assertEquals("5.5.5.5", clientSettings.get("network.publish_host"));
- assertEquals("6.6.6.6", clientSettings.get("transport.host"));
- assertEquals("7.7.7.7", clientSettings.get("transport.bind_host"));
- assertEquals("8.8.8.8", clientSettings.get("transport.publish_host"));
- }
-
- public void testMergeCustomMetaDataSimple() {
- Map mergedCustoms =
- TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE),
- s -> Collections.singletonList(new MergableCustomMetaData1("data1")));
- TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
- assertNotNull(mergedCustom);
- assertEquals(mergedCustom.getData(), "data1");
- }
-
- public void testMergeCustomMetaData() {
- Map mergedCustoms =
- TribeService.mergeChangedCustomMetaData(Collections.singleton(MergableCustomMetaData1.TYPE),
- s -> Arrays.asList(new MergableCustomMetaData1("data1"), new MergableCustomMetaData1("data2")));
- TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
- assertNotNull(mergedCustom);
- assertEquals(mergedCustom.getData(), "data2");
- }
-
- public void testMergeMultipleCustomMetaData() {
- Map> inputMap = new HashMap<>();
- inputMap.put(MergableCustomMetaData1.TYPE,
- Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11")));
- inputMap.put(MergableCustomMetaData2.TYPE,
- Arrays.asList(new MergableCustomMetaData2("data21"), new MergableCustomMetaData2("data20")));
- Map mergedCustoms = TribeService.mergeChangedCustomMetaData(
- Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get);
- TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
- assertNotNull(mergedCustom);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
- assertEquals(mergedCustom.getData(), "data11");
- mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE);
- assertNotNull(mergedCustom);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class));
- assertEquals(mergedCustom.getData(), "data21");
- }
-
- public void testMergeCustomMetaDataFromMany() {
- Map> inputMap = new HashMap<>();
- int n = randomIntBetween(3, 5);
- List customList1 = new ArrayList<>();
- for (int i = 0; i <= n; i++) {
- customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i)));
- }
- Collections.shuffle(customList1, random());
- inputMap.put(MergableCustomMetaData1.TYPE, customList1);
- List customList2 = new ArrayList<>();
- for (int i = 0; i <= n; i++) {
- customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i)));
- }
- Collections.shuffle(customList2, random());
- inputMap.put(MergableCustomMetaData2.TYPE, customList2);
-
- Map mergedCustoms = TribeService.mergeChangedCustomMetaData(
- Sets.newHashSet(MergableCustomMetaData1.TYPE, MergableCustomMetaData2.TYPE), inputMap::get);
- TestCustomMetaData mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData1.TYPE);
- assertNotNull(mergedCustom);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData1.class));
- assertEquals(mergedCustom.getData(), "data1"+String.valueOf(n));
- mergedCustom = (TestCustomMetaData) mergedCustoms.get(MergableCustomMetaData2.TYPE);
- assertNotNull(mergedCustom);
- assertThat(mergedCustom, instanceOf(MergableCustomMetaData2.class));
- assertEquals(mergedCustom.getData(), "data2"+String.valueOf(n));
- }
-
- public static class MockTribePlugin extends TribePlugin {
-
- static List> classpathPlugins = Arrays.asList(MockTribePlugin.class, getTestTransportPlugin());
-
- public MockTribePlugin(Settings settings) {
- super(settings);
- }
-
- protected Function nodeBuilder(Path configPath) {
- return settings -> new MockNode(new Environment(settings, configPath), classpathPlugins);
- }
-
- }
-
- public void testTribeNodeDeprecation() throws IOException {
- final Path tempDir = createTempDir();
- Settings.Builder settings = Settings.builder()
- .put("node.name", "test-node")
- .put("path.home", tempDir)
- .put(NetworkModule.HTTP_ENABLED.getKey(), false)
- .put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), getTestTransportType());
-
- final boolean tribeServiceEnable = randomBoolean();
- if (tribeServiceEnable) {
- String clusterName = "single-node-cluster";
- String tribeSetting = "tribe." + clusterName + ".";
- settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName)
- .put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), getTestTransportType());
- }
- try (Node node = new MockNode(settings.build(), MockTribePlugin.classpathPlugins)) {
- if (tribeServiceEnable) {
- assertWarnings("tribe nodes are deprecated in favor of cross-cluster search and will be removed in Elasticsearch 7.0.0");
- }
- }
- }
-
- static class MergableCustomMetaData1 extends TestCustomMetaData
- implements MergableCustomMetaData {
- public static final String TYPE = "custom_md_1";
-
- protected MergableCustomMetaData1(String data) {
- super(data);
- }
-
- @Override
- public String getWriteableName() {
- return TYPE;
- }
-
- public static MergableCustomMetaData1 readFrom(StreamInput in) throws IOException {
- return readFrom(MergableCustomMetaData1::new, in);
- }
-
- public static NamedDiff readDiffFrom(StreamInput in) throws IOException {
- return readDiffFrom(TYPE, in);
- }
-
- @Override
- public EnumSet context() {
- return EnumSet.of(MetaData.XContentContext.GATEWAY);
- }
-
- @Override
- public MergableCustomMetaData1 merge(MergableCustomMetaData1 other) {
- return (getData().compareTo(other.getData()) >= 0) ? this : other;
- }
- }
-
- static class MergableCustomMetaData2 extends TestCustomMetaData
- implements MergableCustomMetaData {
- public static final String TYPE = "custom_md_2";
-
- protected MergableCustomMetaData2(String data) {
- super(data);
- }
-
- @Override
- public String getWriteableName() {
- return TYPE;
- }
-
- public static MergableCustomMetaData2 readFrom(StreamInput in) throws IOException {
- return readFrom(MergableCustomMetaData2::new, in);
- }
-
- public static NamedDiff readDiffFrom(StreamInput in) throws IOException {
- return readDiffFrom(TYPE, in);
- }
-
-
- @Override
- public EnumSet context() {
- return EnumSet.of(MetaData.XContentContext.GATEWAY);
- }
-
- @Override
- public MergableCustomMetaData2 merge(MergableCustomMetaData2 other) {
- return (getData().compareTo(other.getData()) >= 0) ? this : other;
- }
- }
-}
diff --git a/qa/evil-tests/build.gradle b/qa/evil-tests/build.gradle
index 2ac8cd89783d5..472fc87261602 100644
--- a/qa/evil-tests/build.gradle
+++ b/qa/evil-tests/build.gradle
@@ -27,7 +27,6 @@ apply plugin: 'elasticsearch.standalone-test'
dependencies {
testCompile 'com.google.jimfs:jimfs:1.1'
- testCompile project(path: ':modules:tribe', configuration: 'runtime')
}
// TODO: give each evil test its own fresh JVM for more isolation.
diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java
deleted file mode 100644
index 582cddd5facb1..0000000000000
--- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.tribe;
-
-import org.apache.lucene.util.IOUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.discovery.TestZenDiscovery;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.hamcrest.CoreMatchers.either;
-import static org.hamcrest.CoreMatchers.equalTo;
-
-/**
- * This test doesn't extend {@link ESIntegTestCase} as the internal cluster ignores system properties
- * all the time, while we need to make the tribe node accept them in this case, so that we can verify that they are not read again as part
- * of the tribe client nodes initialization. Note that the started nodes will obey to the 'node.mode' settings as the internal cluster does.
- */
-@SuppressForbidden(reason = "modifies system properties intentionally")
-public class TribeUnitTests extends ESTestCase {
-
- private static List> classpathPlugins;
- private static Node tribe1;
- private static Node tribe2;
-
- @BeforeClass
- public static void createTribes() throws NodeValidationException {
- Settings baseSettings = Settings.builder()
- .put(NetworkModule.HTTP_ENABLED.getKey(), false)
- .put("transport.type", getTestTransportType())
- .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
- .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
- .build();
-
- classpathPlugins = Arrays.asList(TribeAwareTestZenDiscoveryPlugin.class, MockTribePlugin.class, getTestTransportPlugin());
-
- tribe1 = new MockNode(
- Settings.builder()
- .put(baseSettings)
- .put("cluster.name", "tribe1")
- .put("node.name", "tribe1_node")
- .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
- .build(), classpathPlugins).start();
- tribe2 = new MockNode(
- Settings.builder()
- .put(baseSettings)
- .put("cluster.name", "tribe2")
- .put("node.name", "tribe2_node")
- .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
- .build(), classpathPlugins).start();
- }
-
- @AfterClass
- public static void closeTribes() throws IOException {
- IOUtils.close(tribe1, tribe2);
- classpathPlugins = null;
- tribe1 = null;
- tribe2 = null;
- }
-
- public static class TribeAwareTestZenDiscoveryPlugin extends TestZenDiscovery.TestPlugin {
-
- public TribeAwareTestZenDiscoveryPlugin(Settings settings) {
- super(settings);
- }
-
- @Override
- public Settings additionalSettings() {
- if (settings.getGroups("tribe", true).isEmpty()) {
- return super.additionalSettings();
- } else {
- return Settings.EMPTY;
- }
- }
- }
-
- public static class MockTribePlugin extends TribePlugin {
-
- public MockTribePlugin(Settings settings) {
- super(settings);
- }
-
- protected Function nodeBuilder(Path configPath) {
- return settings -> new MockNode(new Environment(settings, configPath), classpathPlugins);
- }
-
- }
-
- public void testThatTribeClientsIgnoreGlobalConfig() throws Exception {
- assertTribeNodeSuccessfullyCreated(getDataPath("elasticsearch.yml").getParent());
- assertWarnings("tribe nodes are deprecated in favor of cross-cluster search and will be removed in Elasticsearch 7.0.0");
- }
-
- private static void assertTribeNodeSuccessfullyCreated(Path configPath) throws Exception {
- // the tribe clients do need it to make sure they can find their corresponding tribes using the proper transport
- Settings settings = Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).put("node.name", "tribe_node")
- .put("transport.type", getTestTransportType())
- .put("tribe.t1.transport.type", getTestTransportType())
- .put("tribe.t2.transport.type", getTestTransportType())
- .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
- .build();
-
- try (Node node = new MockNode(settings, classpathPlugins, configPath).start()) {
- try (Client client = node.client()) {
- assertBusy(() -> {
- ClusterState state = client.admin().cluster().prepareState().clear().setNodes(true).get().getState();
- assertThat(state.getClusterName().value(), equalTo("tribe_node_cluster"));
- assertThat(state.getNodes().getSize(), equalTo(5));
- for (DiscoveryNode discoveryNode : state.getNodes()) {
- assertThat(discoveryNode.getName(), either(equalTo("tribe1_node")).or(equalTo("tribe2_node"))
- .or(equalTo("tribe_node")).or(equalTo("tribe_node/t1")).or(equalTo("tribe_node/t2")));
- }
- });
- }
- }
- }
-
-}
diff --git a/qa/smoke-test-tribe-node/build.gradle b/qa/smoke-test-tribe-node/build.gradle
deleted file mode 100644
index 009298adc6aa0..0000000000000
--- a/qa/smoke-test-tribe-node/build.gradle
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.elasticsearch.gradle.test.ClusterConfiguration
-import org.elasticsearch.gradle.test.ClusterFormationTasks
-import org.elasticsearch.gradle.test.NodeInfo
-
-apply plugin: 'elasticsearch.standalone-rest-test'
-apply plugin: 'elasticsearch.rest-test'
-
-
-ClusterConfiguration configOne = new ClusterConfiguration(project)
-configOne.clusterName = 'one'
-configOne.setting('node.name', 'one')
-List oneNodes = ClusterFormationTasks.setup(project, 'clusterOne', integTestRunner, configOne)
-
-ClusterConfiguration configTwo = new ClusterConfiguration(project)
-configTwo.clusterName = 'two'
-configTwo.setting('node.name', 'two')
-List twoNodes = ClusterFormationTasks.setup(project, 'clusterTwo', integTestRunner, configTwo)
-
-integTestCluster {
- // tribe nodes had a bug where if explicit ports was specified for the tribe node, the dynamic socket permissions that were applied
- // would not account for the fact that the internal node client needed to bind to sockets too; thus, we use explicit port ranges to
- // ensure that the code that fixes this bug is exercised
- setting 'http.port', '40200-40249'
- setting 'transport.tcp.port', '40300-40349'
- setting 'node.name', 'quest'
- setting 'tribe.one.cluster.name', 'one'
- setting 'tribe.one.discovery.zen.ping.unicast.hosts', "'${-> oneNodes.get(0).transportUri()}'"
- setting 'tribe.one.http.enabled', 'true'
- setting 'tribe.one.http.port', '40250-40299'
- setting 'tribe.one.transport.tcp.port', '40350-40399'
- setting 'tribe.two.cluster.name', 'two'
- setting 'tribe.two.discovery.zen.ping.unicast.hosts', "'${-> twoNodes.get(0).transportUri()}'"
- setting 'tribe.two.http.enabled', 'true'
- setting 'tribe.two.http.port', '40250-40299'
- setting 'tribe.two.transport.tcp.port', '40250-40399'
-
- waitCondition = { node, ant ->
- File tmpFile = new File(node.cwd, 'wait.success')
- // 5 nodes: tribe + clusterOne (1 node + tribe internal node) + clusterTwo (1 node + tribe internal node)
- ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=5&wait_for_status=yellow",
- dest: tmpFile.toString(),
- ignoreerrors: true,
- retries: 10)
- return tmpFile.exists()
- }
-}
diff --git a/qa/smoke-test-tribe-node/src/test/java/org/elasticsearch/tribe/TribeClientYamlTestSuiteIT.java b/qa/smoke-test-tribe-node/src/test/java/org/elasticsearch/tribe/TribeClientYamlTestSuiteIT.java
deleted file mode 100644
index 1a8e7867dd8b6..0000000000000
--- a/qa/smoke-test-tribe-node/src/test/java/org/elasticsearch/tribe/TribeClientYamlTestSuiteIT.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.tribe;
-
-import com.carrotsearch.randomizedtesting.annotations.Name;
-import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
-
-import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
-import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
-
-public class TribeClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
-
- // tribe nodes can not handle delete indices requests
- @Override
- protected boolean preserveIndicesUponCompletion() {
- return true;
- }
-
- // tribe nodes can not handle delete template requests
- @Override
- protected boolean preserveTemplatesUponCompletion() {
- return true;
- }
-
- public TribeClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
- super(testCandidate);
- }
-
- @ParametersFactory
- public static Iterable parameters() throws Exception {
- return createParameters();
- }
-}
-
diff --git a/qa/smoke-test-tribe-node/src/test/resources/rest-api-spec/test/tribe/10_basic.yml b/qa/smoke-test-tribe-node/src/test/resources/rest-api-spec/test/tribe/10_basic.yml
deleted file mode 100644
index d70a355ac6274..0000000000000
--- a/qa/smoke-test-tribe-node/src/test/resources/rest-api-spec/test/tribe/10_basic.yml
+++ /dev/null
@@ -1,16 +0,0 @@
----
-"Tribe node test":
- - do:
- cat.nodes:
- h: name
- s: name
- v: true
-
- - match:
- $body: |
- /^ name\n
- one\n
- quest\n
- quest/one\n
- quest/two\n
- two\n $/
diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Security.java b/server/src/main/java/org/elasticsearch/bootstrap/Security.java
index e3de41c09c1c2..80a56fafa54db 100644
--- a/server/src/main/java/org/elasticsearch/bootstrap/Security.java
+++ b/server/src/main/java/org/elasticsearch/bootstrap/Security.java
@@ -337,7 +337,6 @@ static void addFilePermissions(Permissions policy, Environment environment) thro
private static void addBindPermissions(Permissions policy, Settings settings) {
addSocketPermissionForHttp(policy, settings);
addSocketPermissionForTransportProfiles(policy, settings);
- addSocketPermissionForTribeNodes(policy, settings);
}
/**
@@ -383,16 +382,6 @@ private static void addSocketPermissionForTransport(final Permissions policy, fi
addSocketPermissionForPortRange(policy, transportRange);
}
- private static void addSocketPermissionForTribeNodes(final Permissions policy, final Settings settings) {
- for (final Settings tribeNodeSettings : settings.getGroups("tribe", true).values()) {
- // tribe nodes have HTTP disabled by default, so we check if HTTP is enabled before granting
- if (NetworkModule.HTTP_ENABLED.exists(tribeNodeSettings) && NetworkModule.HTTP_ENABLED.get(tribeNodeSettings)) {
- addSocketPermissionForHttp(policy, tribeNodeSettings);
- }
- addSocketPermissionForTransport(policy, tribeNodeSettings);
- }
- }
-
/**
* Add dynamic {@link SocketPermission} for the specified port range.
*
diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java
index 630afe4579bd1..d0470ea2c42c6 100644
--- a/server/src/main/java/org/elasticsearch/node/Node.java
+++ b/server/src/main/java/org/elasticsearch/node/Node.java
@@ -651,7 +651,6 @@ public Node start() throws NodeValidationException {
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
- // tribe nodes don't have a master so we shouldn't register an observer s
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index 7687844231ccd..b63228c6f124c 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -329,7 +329,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
return;
}
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
- // We don't validate cluster names to allow for tribe node connections.
+ // We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
diff --git a/server/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java
index fbfcf6bbd9510..cec5f9b1be28f 100644
--- a/server/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java
+++ b/server/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java
@@ -95,8 +95,7 @@ public void testThreadNames() throws Exception {
}
String nodePrefix = "(" + Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX) + ")?(" +
Pattern.quote(ESIntegTestCase.SUITE_CLUSTER_NODE_PREFIX) + "|" +
- Pattern.quote(ESIntegTestCase.TEST_CLUSTER_NODE_PREFIX) + "|" +
- Pattern.quote("node_tribe2") + ")";
+ Pattern.quote(ESIntegTestCase.TEST_CLUSTER_NODE_PREFIX) +")";
assertThat(threadName, RegexMatcher.matches("\\[" + nodePrefix + "\\d+\\]"));
}
}