diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java index 820640635786d..c469647f7eb74 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java @@ -49,6 +49,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT String remoteCluster = randomAlphaOfLength(4); List leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4)); String followIndexNamePattern = randomAlphaOfLength(4); + boolean active = randomBoolean(); Integer maxOutstandingReadRequests = null; if (randomBoolean()) { @@ -91,7 +92,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT readPollTimeout = new TimeValue(randomNonNegativeLong()); } patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters, - followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, + followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout)); } diff --git a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc index ac8f9e4994139..5ea23782e1967 100644 --- a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc +++ b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc @@ -90,6 +90,7 @@ The API returns the following result: { "name": "my_auto_follow_pattern", "pattern": { + "active": true, "remote_cluster" : "remote_cluster", "leader_index_patterns" : [ diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml index ebf9176c30a91..63c1f6b1da9d0 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -52,3 +52,89 @@ catch: missing ccr.get_auto_follow_pattern: name: my_pattern + +--- +"Test pause and resume auto follow pattern": + - skip: + version: " - 7.9.99" + reason: "pause/resume auto-follow patterns is supported since 8.0" + + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + + - do: + ccr.put_auto_follow_pattern: + name: pattern_test + body: + remote_cluster: local + leader_index_patterns: ['logs-*'] + max_outstanding_read_requests: 2 + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + catch: missing + ccr.pause_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.pause_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: false } + + - do: + catch: missing + ccr.resume_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.resume_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + ccr.delete_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b7cde891d851e..9bf15e2ab030c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -60,6 +60,7 @@ import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction; @@ -81,9 +82,11 @@ import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -97,6 +100,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -236,6 +240,7 @@ public List> getPersistentTasksExecutor(ClusterServic new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class), + new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class), // forget follower action new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class), usageAction, @@ -264,6 +269,8 @@ public List getRestHandlers(Settings settings, RestController restC new RestDeleteAutoFollowPatternAction(restController), new RestPutAutoFollowPatternAction(restController), new RestGetAutoFollowPatternAction(restController), + new RestPauseAutoFollowPatternAction(restController), + new RestResumeAutoFollowPatternAction(restController), // forget follower API new RestForgetFollowerAction(restController)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index cca60579ae6f9..0690d7cc0bfde 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -194,7 +194,7 @@ synchronized void updateStats(List results) { } void updateAutoFollowers(ClusterState followerClusterState) { - AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { return; } @@ -206,8 +206,9 @@ void updateAutoFollowers(ClusterState followerClusterState) { } final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); - Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() - .map(entry -> entry.getValue().getRemoteCluster()) + Set newRemoteClusters = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) + .map(AutoFollowPattern::getRemoteCluster) .filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false) .collect(Collectors.toSet()); @@ -283,6 +284,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS String remoteCluster = entry.getKey(); AutoFollower autoFollower = entry.getValue(); boolean exist = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); if (exist == false) { LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster); @@ -345,6 +347,7 @@ abstract static class AutoFollower { private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; private volatile boolean stop; + private volatile List lastActivePatterns = List.of(); AutoFollower(final String remoteCluster, final Consumer> statsUpdater, @@ -384,7 +387,9 @@ void start() { final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .filter(entry -> entry.getValue().isActive()) .map(Map.Entry::getKey) + .sorted() .collect(Collectors.toList()); if (patterns.isEmpty()) { LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); @@ -394,8 +399,15 @@ void start() { this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); + // keep the list of the last known active patterns for this auto-follower + // if the list changed, we explicitly retrieve the last cluster state in + // order to avoid timeouts when waiting for the next remote cluster state + // version that might never arrive + final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion; + this.lastActivePatterns = List.copyOf(patterns); + final Thread thread = Thread.currentThread(); - getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> { + getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> { // Also check removed flag here, as it may take a while for this remote cluster state api call to return: if (removed) { LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster); @@ -445,8 +457,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata, Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - final List leaderIndicesToFollow = - getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(autoFollowPatternName), thread); } else { @@ -599,7 +610,7 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { continue; } - if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { + if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java new file mode 100644 index 0000000000000..e0dfa331ac2e3 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction { + + @Inject + public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver resolver) { + super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(final StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(final Request request, final ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(final Task task, final Request request, final ClusterState state, + final ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(), + new AckedClusterStateUpdateTask<>(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(final boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return innerActivate(request, currentState); + } + }); + } + + static ClusterState innerActivate(final Request request, ClusterState currentState) { + final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + final Map patterns = autoFollowMetadata.getPatterns(); + final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName()); + if (previousAutoFollowPattern == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + if (previousAutoFollowPattern.isActive() == request.isActive()) { + return currentState; + } + + final Map newPatterns = new HashMap<>(patterns); + newPatterns.put(request.getName(), + new AutoFollowMetadata.AutoFollowPattern( + previousAutoFollowPattern.getRemoteCluster(), + previousAutoFollowPattern.getLeaderIndexPatterns(), + previousAutoFollowPattern.getFollowIndexPattern(), + request.isActive(), + previousAutoFollowPattern.getMaxReadRequestOperationCount(), + previousAutoFollowPattern.getMaxWriteRequestOperationCount(), + previousAutoFollowPattern.getMaxOutstandingReadRequests(), + previousAutoFollowPattern.getMaxOutstandingWriteRequests(), + previousAutoFollowPattern.getMaxReadRequestSize(), + previousAutoFollowPattern.getMaxWriteRequestSize(), + previousAutoFollowPattern.getMaxWriteBufferCount(), + previousAutoFollowPattern.getMaxWriteBufferSize(), + previousAutoFollowPattern.getMaxRetryDelay(), + previousAutoFollowPattern.getReadPollTimeout())); + + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders())) + .build()) + .build(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index c453d45e37349..67d036899f5a4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -161,6 +161,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, request.getRemoteCluster(), request.getLeaderIndexPatterns(), request.getFollowIndexNamePattern(), + true, request.getParameters().getMaxReadRequestOperationCount(), request.getParameters().getMaxWriteRequestOperationCount(), request.getParameters().getMaxOutstandingReadRequests(), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java new file mode 100644 index 0000000000000..abfca00da5cb1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestPauseAutoFollowPatternAction extends BaseRestHandler { + + public RestPauseAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/pause", this); + } + + @Override + public String getName() { + return "ccr_pause_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), false); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java new file mode 100644 index 0000000000000..89f3f65fca7d3 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestResumeAutoFollowPatternAction extends BaseRestHandler { + + public RestResumeAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/resume", this); + } + + @Override + public String getName() { + return "ccr_resume_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), true); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index b971182e3d7cc..e453e3d2367b2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; @@ -15,26 +16,34 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; import org.elasticsearch.xpack.core.ccr.action.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -382,6 +391,161 @@ public void testAutoFollowSoftDeletesDisabled() throws Exception { }); } + public void testPauseAndResumeAutoFollowPattern() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + // index created in the remote cluster before the auto follow pattern exists won't be auto followed + createLeaderIndex("test-existing-index-is-ignored", leaderIndexSettings); + + // create the auto follow pattern + putAutoFollowPatterns("test-pattern", new String[]{"test-*", "tests-*"}); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + }); + + // index created in the remote cluster are auto followed + createLeaderIndex("test-new-index-is-auto-followed", leaderIndexSettings); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + assertTrue(ESIntegTestCase.indexExists("copy-test-new-index-is-auto-followed", followerClient())); + }); + ensureFollowerGreen("copy-test-new-index-is-auto-followed"); + + // pause the auto follow pattern + pauseAutoFollowPattern("test-pattern"); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(0))); + + // indices created in the remote cluster are not auto followed because the pattern is paused + final int nbIndicesCreatedWhilePaused = randomIntBetween(1, 5); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + createLeaderIndex("test-index-created-while-pattern-is-paused-" + i, leaderIndexSettings); + } + + // sometimes create another index in the remote cluster and close (or delete) it right away + // it should not be auto followed when the pattern is resumed + if (randomBoolean()) { + final String indexName = "test-index-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + if (randomBoolean()) { + assertAcked(leaderClient().admin().indices().prepareClose(indexName)); + } else { + assertAcked(leaderClient().admin().indices().prepareDelete(indexName)); + } + } + + if (randomBoolean()) { + createLeaderIndex("logs-20200101", leaderIndexSettings); + } + + // pattern is paused, none of the newly created indices has been followed yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1)); + ensureLeaderGreen("test-index-created-while-pattern-is-paused-*"); + + // resume the auto follow pattern, indices created while the pattern was paused are picked up for auto-following + resumeAutoFollowPattern("test-pattern"); + assertBusy(() -> { + final Client client = followerClient(); + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1 + nbIndicesCreatedWhilePaused)); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + assertTrue(ESIntegTestCase.indexExists("copy-test-index-created-while-pattern-is-paused-" + i, client)); + } + }); + } + + public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"}; + if (randomBoolean()) { + // sometimes create indices in the remote cluster that match the future auto follow patterns + Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings)); + } + + // create auto follow patterns + final List autoFollowPatterns = new ArrayList<>(prefixes.length); + for (String prefix : prefixes) { + String name = prefix + "pattern"; + putAutoFollowPatterns(name, new String[]{prefix + "*"}); + autoFollowPatterns.add(name); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1))); + assertTrue(getAutoFollowPattern(name).isActive()); + } + + // no following indices are created yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0)); + + // create random indices in the remote cluster that match the patterns + final AtomicBoolean running = new AtomicBoolean(true); + final Set leaderIndices = ConcurrentCollections.newConcurrentSet(); + final Thread createNewLeaderIndicesThread = new Thread(() -> { + while (running.get()) { + try { + String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + leaderIndices.add(indexName); + Thread.sleep(randomIntBetween(100, 500)); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + createNewLeaderIndicesThread.start(); + + // wait for some leader indices to be auto-followed + assertBusy(() -> + assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length))); + + final int nbLeaderIndices = leaderIndices.size(); + + // pause some random patterns + final List pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns); + pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive()))); + + assertBusy(() -> { + final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0; + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters)); + if (expectedAutoFollowedClusters > 0) { + // wait for more indices to be created in the remote cluster while some patterns are paused + assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3)); + } + }); + ensureFollowerGreen(true, "copy-*"); + + // resume auto follow patterns + pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + + // stop creating indices in the remote cluster + running.set(false); + createNewLeaderIndicesThread.join(); + + ensureLeaderGreen(leaderIndices.toArray(new String[0])); + + // check that all leader indices have been correctly auto followed + assertBusy(() -> { + final Client client = followerClient(); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size())); + leaderIndices.stream() + .map(leaderIndex -> "copy-" + leaderIndex) + .forEach(followerIndex -> + assertTrue("following index must exist: " + followerIndex, ESIntegTestCase.indexExists(followerIndex, client))); + }); + } + private void putAutoFollowPatterns(String name, String[] patterns) { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName(name); @@ -408,4 +572,21 @@ private void createLeaderIndex(String index, Settings settings) { leaderClient().admin().indices().create(request).actionGet(); } + private void pauseAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, false); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private void resumeAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, true); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private AutoFollowMetadata.AutoFollowPattern getAutoFollowPattern(final String name) { + GetAutoFollowPatternAction.Request request = new GetAutoFollowPatternAction.Request(); + request.setName(name); + GetAutoFollowPatternAction.Response response = followerClient().execute(GetAutoFollowPatternAction.INSTANCE, request).actionGet(); + assertTrue(response.getAutoFollowPatterns().containsKey(name)); + return response.getAutoFollowPatterns().get(name); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 26182781233e2..32b4b3ed9d174 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -44,7 +44,7 @@ protected AutoFollowMetadata createTestInstance() { randomAlphaOfLength(4), leaderPatterns, randomAlphaOfLength(4), - randomIntBetween(0, Integer.MAX_VALUE), + true, randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRInfoTransportActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRInfoTransportActionTests.java index 8c7310a0a1bf3..5ef165ff5a5ec 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRInfoTransportActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRInfoTransportActionTests.java @@ -97,7 +97,7 @@ public void testUsageStats() throws Exception { Map patterns = new HashMap<>(numAutoFollowPatterns); for (int i = 0; i < numAutoFollowPatterns; i++) { AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern("remote_cluser", - Collections.singletonList("logs" + i + "*"), null, null, null, null, null, null, null, null, null, null, null); + Collections.singletonList("logs" + i + "*"), null, true, null, null, null, null, null, null, null, null, null, null); patterns.put("pattern" + i, pattern); } metaData.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index f8e7eab1c8647..13aa3208e5550 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -165,7 +165,7 @@ public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCo @Override public ClusterState execute(ClusterState currentState) throws Exception { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( Collections.singletonMap("test_alias", autoFollowPattern), Collections.emptyMap(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java new file mode 100644 index 0000000000000..961bf94f658ac --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ActivateAutoFollowPatternActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected ActivateAutoFollowPatternAction.Request createTestInstance() { + return new ActivateAutoFollowPatternAction.Request(randomAlphaOfLength(5), randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ActivateAutoFollowPatternAction.Request::new; + } + + public void testValidate() { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(null, true); + ActionRequestValidationException validationException = request.validate(); + assertThat(validationException, notNullValue()); + assertThat(validationException.getMessage(), containsString("[name] is missing")); + + request = new ActivateAutoFollowPatternAction.Request("name", true); + validationException = request.validate(); + assertThat(validationException, nullValue()); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 7fa7c13f2c994..1801171866540 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -38,6 +38,8 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import java.util.ArrayList; @@ -52,6 +54,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -59,8 +62,12 @@ import java.util.function.Function; import java.util.function.Supplier; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -68,6 +75,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -81,7 +89,7 @@ public void testAutoFollower() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -150,7 +158,7 @@ public void testAutoFollowerClusterStateApiFailure() { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -202,7 +210,7 @@ public void testAutoFollowerUpdateClusterStateFailure() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -253,13 +261,232 @@ void updateAutoFollowMetadata(Function updateFunctio assertThat(invoked[0], is(true)); } + public void testAutoFollowerWithNoActivePatternsDoesNotStart() { + final String remoteCluster = randomAlphaOfLength(5); + + final Map autoFollowPatterns = new HashMap<>(2); + autoFollowPatterns.put("pattern_1", new AutoFollowPattern(remoteCluster, List.of("logs-*", "test-*"), "copy-", false, null, null, + null, null, null, null, null, null, null, null)); + autoFollowPatterns.put("pattern_2", new AutoFollowPattern(remoteCluster, List.of("users-*"), "copy-", false, null, null, + null, null, null, null, null, null, null, null)); + + final Map> followedLeaderIndexUUIDs = new HashMap<>(2); + followedLeaderIndexUUIDs.put("pattern_1", List.of("uuid1", "uuid2")); + followedLeaderIndexUUIDs.put("pattern_2", Collections.emptyList()); + + final Map> headers = new HashMap<>(2); + headers.put("pattern_1", singletonMap("header", "value")); + headers.put("pattern_2", emptyMap()); + + final Supplier followerClusterStateSupplier = localClusterStateSupplier(ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(autoFollowPatterns, followedLeaderIndexUUIDs, headers)) + .build()) + .build()); + + final AtomicBoolean invoked = new AtomicBoolean(false); + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, v -> invoked.set(true), followerClusterStateSupplier, () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + invoked.set(true); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + invoked.set(true); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + invoked.set(true); + } + }; + + autoFollower.start(); + assertThat(invoked.get(), is(false)); + } + + public void testAutoFollowerWithPausedActivePatterns() { + final String remoteCluster = randomAlphaOfLength(5); + + final AtomicReference remoteClusterState = new AtomicReference<>( + createRemoteClusterState("patternLogs-0", true, randomLongBetween(1L, 1_000L)) + ); + + final AtomicReference localClusterState = new AtomicReference<>( + ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(emptyMap(), emptyMap(), emptyMap()))) + .build() + ); + + // compute and return the local cluster state, updated with some auto-follow patterns + final Supplier localClusterStateSupplier = () -> localClusterState.updateAndGet(currentLocalState -> { + final int nextClusterStateVersion = (int) (currentLocalState.version() + 1); + + final ClusterState nextLocalClusterState; + if (nextClusterStateVersion == 1) { + // cluster state #1 : one pattern is active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternLogs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternLogs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 2) { + // cluster state #2 : still one pattern is active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 3) { + // cluster state #3 : add a new pattern, two patterns are active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternDocs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternDocs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 4) { + // cluster state #4 : still both patterns are active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 5) { + // cluster state #5 : first pattern is paused, second pattern is still active + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternLogs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else if (nextClusterStateVersion == 6) { + // cluster state #5 : second pattern is paused, both patterns are inactive + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternDocs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else { + return currentLocalState; + } + + return ClusterState.builder(nextLocalClusterState) + .version(nextClusterStateVersion) + .build(); + }); + + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final List autoFollowResults = new ArrayList<>(); + + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, autoFollowResults::addAll, localClusterStateSupplier, () -> 1L, Runnable::run) { + + int countFetches = 1; // to be aligned with local cluster state updates + ClusterState lastFetchedRemoteClusterState; + + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + assertThat(remote, equalTo(remoteCluster)); + + // in this test, every time it fetches the remote cluster state new leader indices to follow appears + final String[] newLeaderIndices = {"patternLogs-" + countFetches, "patternDocs-" + countFetches}; + + if (countFetches == 1) { + assertThat("first invocation, it should retrieve the metadata version 1", metadataVersion, equalTo(1L)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + + } else if (countFetches == 2 || countFetches == 4) { + assertThat("no patterns changes, it should retrieve the last known metadata version + 1", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version() + 1)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is aligned with what the auto-follower is requesting", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion)); + + } else if (countFetches == 3 || countFetches == 5) { + assertThat("patterns have changed, it should retrieve the last known metadata version again", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version())); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is incremented", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion + 1)); + } else { + fail("after the 5th invocation there are no more active patterns, the auto-follower should have stopped"); + } + + countFetches = countFetches + 1; + remoteClusterState.set(lastFetchedRemoteClusterState); + handler.accept(new ClusterStateResponse(lastFetchedRemoteClusterState.getClusterName(), + lastFetchedRemoteClusterState, false), null); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + assertThat(request.getRemoteCluster(), equalTo(remoteCluster)); + assertThat(request.getFollowerIndex(), startsWith("copy-")); + followedIndices.add(request.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + localClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + + autoFollower.start(); + + assertThat(autoFollowResults.size(), equalTo(7)); + assertThat(followedIndices, containsInAnyOrder( + "patternLogs-1", // iteration #1 : only pattern "patternLogs" is active in local cluster state + "patternLogs-2", // iteration #2 : only pattern "patternLogs" is active in local cluster state + "patternLogs-3", // iteration #3 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-3", // + "patternLogs-4", // iteration #4 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-4", // + "patternDocs-5" // iteration #5 : only pattern "patternDocs" is active in local cluster state, "patternLogs" is paused + )); + + final ClusterState finalRemoteClusterState = remoteClusterState.get(); + final ClusterState finalLocalClusterState = localClusterState.get(); + + AutoFollowMetadata autoFollowMetadata = finalLocalClusterState.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(2)); + assertThat(autoFollowMetadata.getPatterns().values().stream().noneMatch(AutoFollowPattern::isActive), is(true)); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternLogs"), + containsInAnyOrder( + finalRemoteClusterState.metaData().index("patternLogs-0").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-4").getIndexUUID() + // patternLogs-5 exists in remote cluster state but patternLogs was paused + )); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternDocs"), + containsInAnyOrder( + // patternDocs-0 does not exist in remote cluster state + finalRemoteClusterState.metaData().index("patternDocs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-4").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-5").getIndexUUID() + )); + } + public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -317,13 +544,8 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } public void testGetLeaderIndicesToFollow() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), - null, null, null, null, null, null, null, null, null, null, null); - Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Map.of("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); + final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, + null, null, null, null, null, null, null, null, null, null); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); MetaData.Builder imdBuilder = MetaData.builder(); @@ -368,7 +590,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(3).getName(), equalTo("metrics-3")); assertThat(result.get(4).getName(), equalTo("metrics-4")); - List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); + final List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(4)); @@ -376,16 +598,20 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(1).getName(), equalTo("metrics-1")); assertThat(result.get(2).getName(), equalTo("metrics-3")); assertThat(result.get(3).getName(), equalTo("metrics-4")); + + final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, + false, null, null, null, null, null, null, null, null, null, null); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, followedIndexUUIDs); + assertThat(result.size(), equalTo(0)); } public void testGetLeaderIndicesToFollow_shardsNotStarted() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); - Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Map.of("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); + null, true, null, null, null, null, null, null, null, null, null, null); // 1 shard started and another not started: ClusterState remoteState = createRemoteClusterState("index1", true); @@ -425,7 +651,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { public void testGetLeaderIndicesToFollowWithClosedIndices() { final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); // index is opened ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); @@ -552,15 +778,15 @@ public void testCleanFollowedLeaderIndicesNoEntry() { } public void testGetFollowerIndexName() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } @@ -643,11 +869,11 @@ public void testUpdateAutoFollowers() { Runnable::run); // Add 3 patterns: Map patterns = new HashMap<>(); - patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -673,7 +899,7 @@ public void testUpdateAutoFollowers() { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); assertThat(removedAutoFollower1.removed, is(true)); // Add pattern 4: - patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -733,12 +959,100 @@ public void testUpdateAutoFollowersNoAutoFollowMetadata() { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); } + public void testUpdateAutoFollowersNoActivePatterns() { + final ClusterService clusterService = mockClusterService(); + final AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L, + () -> 1L, + Runnable::run); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.EMPTY_STATE); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower1 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower1.removed, is(false)); + AutoFollowCoordinator.AutoFollower removedAutoFollower2 = autoFollowCoordinator.getAutoFollowers().get("remote2"); + assertThat(removedAutoFollower2.removed, is(false)); + + // Make pattern 1 and pattern 3 inactive + patterns.computeIfPresent("pattern1", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + patterns.computeIfPresent("pattern3", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(false)); + + // Add active pattern 4 and make pattern 2 inactive + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.computeIfPresent("pattern2", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower4 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower4.removed, is(false)); + assertNotSame(removedAutoFollower4, removedAutoFollower1); + assertThat(removedAutoFollower2.removed, is(true)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(true)); + assertThat(removedAutoFollower4.removed, is(true)); + } + public void testWaitForMetadataVersion() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -801,7 +1115,7 @@ public void testWaitForTimeOut() { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -859,7 +1173,7 @@ public void testAutoFollowerSoftDeletesDisabled() { ClusterState remoteState = createRemoteClusterState("logs-20190101", false); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -925,7 +1239,7 @@ public void testAutoFollowerFollowerIndexAlreadyExists() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -1011,7 +1325,7 @@ public void testRepeatedFailures() throws InterruptedException { "remote", List.of("*"), "{}", - 0, + true, 0, 0, 0, 0, @@ -1082,7 +1396,7 @@ public void testClosedIndicesAreNotAutoFollowed() { final ClusterState localState = ClusterState.builder(new ClusterName("local")) .metaData(MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Map.of(pattern, new AutoFollowPattern("remote", List.of("docs-*"), null, + new AutoFollowMetadata(Map.of(pattern, new AutoFollowPattern("remote", List.of("docs-*"), null, true, null, null, null, null, null, null, null, null, null, null)), Map.of(pattern, List.of()), Map.of(pattern, Map.of())))) .build(); @@ -1154,8 +1468,14 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes) { + return createRemoteClusterState(indexName, enableSoftDeletes, 0L); + } + + private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes, long metadataVersion) { Settings.Builder indexSettings; - indexSettings = settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes); + indexSettings = settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())); IndexMetaData indexMetaData = IndexMetaData.builder(indexName) .settings(indexSettings) @@ -1163,7 +1483,9 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e .numberOfReplicas(0) .build(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().put(indexMetaData, true)); + .metaData(MetaData.builder() + .put(indexMetaData, true) + .version(metadataVersion)); ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); @@ -1173,25 +1495,29 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e return csBuilder.build(); } - private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { - IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) - .numberOfShards(1) - .numberOfReplicas(0) + private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) { + if (indices == null) { + return previous; + } + final MetaData.Builder metadataBuilder = MetaData.builder(previous.metaData()).version(previous.metaData().version() + 1); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(previous.routingTable()); + for (String indexName : indices) { + IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + metadataBuilder.put(indexMetaData, true); + routingTableBuilder.add(IndexRoutingTable.builder(indexMetaData.getIndex()) + .addShard(TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted()) + .build()); + } + return ClusterState.builder(previous.getClusterName()) + .metaData(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) - .metaData(MetaData.builder(previous.metaData()) - .version(previous.metaData().version() + 1) - .put(indexMetaData, true)); - - ShardRouting shardRouting = - TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); - IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); - - return csBuilder.build(); } private static Supplier localClusterStateSupplier(ClusterState... states) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java index 55582815ce5e6..5fd6381001d36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java @@ -33,7 +33,7 @@ protected GetAutoFollowPatternAction.Response createTestInstance() { "remote", Collections.singletonList(randomAlphaOfLength(4)), randomAlphaOfLength(4), - randomIntBetween(0, Integer.MAX_VALUE), + true, randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java new file mode 100644 index 0000000000000..18fa6cbd1dbf3 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.util.Arrays; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class TransportActivateAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerActivateNoAutoFollowMetadata() { + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("test", true), ClusterState.EMPTY_STATE)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [test] is missing")); + } + + public void testInnerActivateDoesNotExist() { + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", randomAutoFollowPattern()), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("does_not_exist", true), clusterState)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [does_not_exist] is missing")); + } + + public void testInnerActivateToggle() { + final AutoFollowMetadata.AutoFollowPattern autoFollowPattern = randomAutoFollowPattern(); + final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", autoFollowPattern), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive()); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, sameInstance(clusterState)); + } + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive() == false); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, not(sameInstance(clusterState))); + + AutoFollowMetadata updatedAutoFollowMetadata = updatedState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, notNullValue()); + + AutoFollowMetadata autoFollowMetadata = clusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, autoFollowMetadata); + assertThat(updatedAutoFollowMetadata.getPatterns().size(), equalTo(autoFollowMetadata.getPatterns().size())); + assertThat(updatedAutoFollowMetadata.getPatterns().get("remote_cluster").isActive(), not(autoFollowPattern.isActive())); + + assertEquals(updatedAutoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getFollowedLeaderIndexUUIDs()); + assertEquals(updatedAutoFollowMetadata.getHeaders(), autoFollowMetadata.getHeaders()); + } + } + + private static AutoFollowMetadata.AutoFollowPattern randomAutoFollowPattern() { + return new AutoFollowMetadata.AutoFollowPattern(randomAlphaOfLength(5), + randomSubsetOf(Arrays.asList("test-*", "user-*", "logs-*", "failures-*")), + randomFrom("{{leader_index}}", "{{leader_index}}-follower", "test"), + randomBoolean(), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + TimeValue.timeValueSeconds(randomIntBetween(30, 600)), + TimeValue.timeValueSeconds(randomIntBetween(30, 600))); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java index 5ef43fc05c81c..06bb95e333ff5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -32,8 +32,8 @@ public void testInnerDelete() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -43,8 +43,8 @@ public void testInnerDelete() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); - existingAutoFollowPatterns.put("name2", - new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name2", new AutoFollowPattern("asia_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -76,8 +76,8 @@ public void testInnerDeleteDoesNotExist() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java index 85e1bf916aa3c..7de170442f137 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java @@ -24,9 +24,9 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase { public void testGetAutoFollowPattern() { Map patterns = new HashMap<>(); patterns.put("name1", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); patterns.put("name2", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); MetaData metaData = MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index ac556d47c85dd..05315f239be83 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -103,7 +103,7 @@ public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index f775256272c56..0a3db4a5b7730 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ccr.action.ImmutableFollowParameters; @@ -173,8 +173,9 @@ public int hashCode() { return Objects.hash(patterns, followedLeaderIndexUUIDs, headers); } - public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentObject { + public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentFragment { + public static final ParseField ACTIVE = new ParseField("active"); public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns"); public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern"); @@ -182,24 +183,28 @@ public static class AutoFollowPattern extends ImmutableFollowParameters implemen @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", - args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], (Integer) args[3], - (Integer) args[4], (Integer) args[5], (Integer) args[6], (ByteSizeValue) args[7], (ByteSizeValue) args[8], - (Integer) args[9], (ByteSizeValue) args[10], (TimeValue) args[11], (TimeValue) args[12])); + args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], + args[3] == null || (boolean) args[3], (Integer) args[4], (Integer) args[5], (Integer) args[6], (Integer) args[7], + (ByteSizeValue) args[8], (ByteSizeValue) args[9], (Integer) args[10], (ByteSizeValue) args[11], (TimeValue) args[12], + (TimeValue) args[13])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD); PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACTIVE); ImmutableFollowParameters.initParser(PARSER); } private final String remoteCluster; private final List leaderIndexPatterns; private final String followIndexPattern; + private final boolean active; public AutoFollowPattern(String remoteCluster, List leaderIndexPatterns, String followIndexPattern, + boolean active, Integer maxReadRequestOperationCount, Integer maxWriteRequestOperationCount, Integer maxOutstandingReadRequests, @@ -215,6 +220,7 @@ public AutoFollowPattern(String remoteCluster, this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + this.active = active; } public static AutoFollowPattern readFrom(StreamInput in) throws IOException { @@ -227,6 +233,11 @@ private AutoFollowPattern(String remoteCluster, List leaderIndexPatterns this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.active = in.readBoolean(); + } else { + this.active = true; + } } public boolean match(String indexName) { @@ -249,16 +260,24 @@ public String getFollowIndexPattern() { return followIndexPattern; } + public boolean isActive() { + return active; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); out.writeStringCollection(leaderIndexPatterns); out.writeOptionalString(followIndexPattern); super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(active); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(ACTIVE.getPreferredName(), active); builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0])); if (followIndexPattern != null) { @@ -268,25 +287,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - @Override - public boolean isFragment() { - return true; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; AutoFollowPattern pattern = (AutoFollowPattern) o; - return remoteCluster.equals(pattern.remoteCluster) && + return active == pattern.active && + remoteCluster.equals(pattern.remoteCluster) && leaderIndexPatterns.equals(pattern.leaderIndexPatterns) && followIndexPattern.equals(pattern.followIndexPattern); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern); + return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, active); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 879c8d79cd548..7d621a62cf359 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -115,8 +115,8 @@ public AutoFollowStats(StreamInput in) throws IOException { numberOfFailedRemoteClusterStateRequests = in.readVLong(); numberOfSuccessfulFollowIndices = in.readVLong(); // note: the casts to the following Writeable.Reader instances are needed by some IDEs (e.g. Eclipse 4.8) as a compiler help - recentAutoFollowErrors = new TreeMap<>(in.readMap((Writeable.Reader) StreamInput::readString, - (Writeable.Reader>) in1 -> new Tuple<>(in1.readZLong(), in1.readException()))); + recentAutoFollowErrors = new TreeMap<>(in.readMap(StreamInput::readString, + in1 -> new Tuple<>(in1.readZLong(), in1.readException()))); autoFollowedClusters = new TreeMap<>(in.readMap(StreamInput::readString, AutoFollowedCluster::new)); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java new file mode 100644 index 0000000000000..5b8f644ceca67 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ActivateAutoFollowPatternAction extends ActionType { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/activate"; + public static final ActivateAutoFollowPatternAction INSTANCE = new ActivateAutoFollowPatternAction(); + + private ActivateAutoFollowPatternAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends AcknowledgedRequest { + + private final String name; + private final boolean active; + + public Request(final String name, final boolean active) { + this.name = name; + this.active = active; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("[name] is missing", validationException); + } + return validationException; + } + + public String getName() { + return name; + } + + public boolean isActive() { + return active; + } + + public Request(final StreamInput in) throws IOException { + super(in); + name = in.readString(); + active = in.readBoolean(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeBoolean(active); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return active == request.active + && Objects.equals(name, request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, active); + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json new file mode 100644 index 0000000000000..9e76b83bb904f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.pause_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-pause-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/pause", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern that should pause discovering new indices to follow." + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json new file mode 100644 index 0000000000000..96b77cb82e93f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.resume_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-resume-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/resume", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern to resume discovering new indices to follow." + } + } + } + ] + } + } +}