From 61c8ad906e90afb914b24250a7ca23d045b8fbe1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sat, 8 Sep 2018 19:18:22 +0200 Subject: [PATCH 01/16] [CCR] Added history uuid validation For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 --- .../xpack/ccr/FollowIndexSecurityIT.java | 17 ++-- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 + .../xpack/ccr/CcrLicenseChecker.java | 45 +++++++-- .../action/CreateAndFollowIndexAction.java | 56 ++++++++++- .../xpack/ccr/action/FollowIndexAction.java | 74 +++++++++++--- .../xpack/ccr/action/ShardChangesAction.java | 27 +++++- .../xpack/ccr/action/ShardFollowNodeTask.java | 5 + .../xpack/ccr/action/ShardFollowTask.java | 46 +++++++-- .../xpack/ccr/ShardChangesIT.java | 16 ++- .../ccr/action/FollowIndexActionTests.java | 97 ++++++++++++------- .../ccr/action/ShardChangesResponseTests.java | 8 +- .../ShardFollowNodeTaskRandomTests.java | 51 +++++++--- .../ccr/action/ShardFollowNodeTaskTests.java | 41 +++++--- .../ShardFollowTaskReplicationTests.java | 25 ++++- .../ccr/action/ShardFollowTaskTests.java | 4 +- 15 files changed, 394 insertions(+), 120 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 7d658550d92b9..f314d278a5400 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -8,6 +8,7 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -26,6 +27,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class FollowIndexSecurityIT extends ESRestTestCase { @@ -96,16 +98,13 @@ public void testFollowIndex() throws Exception { assertThat(countCcrNodeTasks(), equalTo(0)); }); - createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex); - // Verify that nothing has been replicated and no node tasks are running - // These node tasks should have been failed due to the fact that the user - // has no sufficient priviledges. - assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - verifyDocuments(adminClient(), unallowedIndex, 0); + Exception e = expectThrows(ResponseException.class, + () -> createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); - followIndex("leader_cluster:" + unallowedIndex, unallowedIndex); - assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); - verifyDocuments(adminClient(), unallowedIndex, 0); + e = expectThrows(ResponseException.class, + () -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cd0561b1c0c60..5cdc1b0585de6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -82,6 +82,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; + public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY = "leader_index_history_uuid"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index cefa490f4f7e2..bd4db1f0aba4e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -10,19 +10,30 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; +import java.util.HashMap; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; -import java.util.function.Consumer; /** * Encapsulates licensing checking for CCR. @@ -62,19 +73,19 @@ public boolean isCcrAllowed() { * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the * remote cluster. * - * @param client the client - * @param clusterAlias the remote cluster alias - * @param leaderIndex the name of the leader index - * @param listener the listener - * @param leaderIndexMetadataConsumer the leader index metadata consumer - * @param the type of response the listener is waiting for + * @param client the client + * @param clusterAlias the remote cluster alias + * @param leaderIndex the name of the leader index + * @param listener the listener + * @param historyUUIDAndLeaderIndexMetadataConsumer the leader index history uuid and the leader index metadata consumer + * @param the type of response the listener is waiting for */ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( final Client client, final String clusterAlias, final String leaderIndex, final ActionListener listener, - final Consumer leaderIndexMetadataConsumer) { + final BiConsumer, IndexMetaData> historyUUIDAndLeaderIndexMetadataConsumer) { // we have to check the license on the remote cluster new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses( Collections.singletonList(clusterAlias), @@ -93,7 +104,23 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe final ClusterState remoteClusterState = r.getState(); final IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); - leaderIndexMetadataConsumer.accept(leaderIndexMetadata); + CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + Map historyUUIDs = new HashMap<>(); + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + CommitStats commitStats = shardStats.getCommitStats(); + String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs.put(shardId.id(), historyUUID); + } + } + historyUUIDAndLeaderIndexMetadataConsumer.accept(historyUUIDs, leaderIndexMetadata); + }; + IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(leaderIndex); + remoteClient.admin().indices().stats(request, + ActionListener.wrap(indicesStatsHandler, listener::onFailure)); }, listener::onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 2e36bca293225..7f96fa4ea8524 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -12,6 +12,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -29,6 +34,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -36,18 +42,23 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public class CreateAndFollowIndexAction extends Action { @@ -242,8 +253,12 @@ protected void masterOperation( private void createFollowerIndexAndFollowLocalIndex( final Request request, final ClusterState state, final ActionListener listener) { // following an index in local cluster, so use local cluster state to fetch leader index metadata - final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); - createFollowerIndex(leaderIndexMetadata, request, listener); + final String leaderIndex = request.getFollowRequest().getLeaderIndex(); + final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); + Consumer> handler = historyUUID -> { + createFollowerIndex(leaderIndexMetadata, historyUUID, request, listener); + }; + fetchHistoryUUID(client, leaderIndex, handler, listener::onFailure); } private void createFollowerIndexAndFollowRemoteIndex( @@ -256,11 +271,14 @@ private void createFollowerIndexAndFollowRemoteIndex( clusterAlias, leaderIndex, listener, - leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener)); + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); } private void createFollowerIndex( - final IndexMetaData leaderIndexMetaData, final Request request, final ActionListener listener) { + final IndexMetaData leaderIndexMetaData, + final Map historyUUIDs, + final Request request, + final ActionListener listener) { if (leaderIndexMetaData == null) { listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() + "] does not exist")); @@ -296,6 +314,13 @@ public ClusterState execute(ClusterState currentState) throws Exception { MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); + // Adding the leader index uuid for each shard as custom metadata: + Map metadata = new HashMap<>(); + for (Map.Entry entry : historyUUIDs.entrySet()) { + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + entry.getKey(), entry.getValue()); + } + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + // Copy all settings, but overwrite a few settings. Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put(leaderIndexMetaData.getSettings()); @@ -350,6 +375,29 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); } + // would be great if can reuse some of the logic in CcrLicenseChecker to do remote calls for + // fetching leader index metadata and leader index uuid + static void fetchHistoryUUID(final Client client, + final String leaderIndex, + final Consumer> handler, + final Consumer errorHandler) { + IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(leaderIndex); + CheckedConsumer onResponseHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + Map historyUUIDs = new HashMap<>(); + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs.put(shardId.id(), historyUUID); + } + } + handler.accept(historyUUIDs); + }; + client.admin().indices().stats(request, ActionListener.wrap(onResponseHandler, errorHandler)); + } + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 17b7bbe674b38..f40ff882c4086 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -47,11 +47,13 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -62,6 +64,8 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.TransportAction.fetchHistoryUUID; + public class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); @@ -352,11 +356,13 @@ private void followLocalIndex(final Request request, final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); // following an index in local cluster, so use local cluster state to fetch leader index metadata final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); - try { - start(request, null, leaderIndexMetadata, followerIndexMetadata, listener); - } catch (final IOException e) { - listener.onFailure(e); - } + fetchHistoryUUID(client, request.getLeaderIndex(), historyUUIDs -> { + try { + start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); + } catch (final IOException e) { + listener.onFailure(e); + } + }, listener::onFailure); } private void followRemoteIndex( @@ -371,9 +377,9 @@ private void followRemoteIndex( clusterAlias, leaderIndex, listener, - leaderIndexMetadata -> { + (leaderHistoryUUID, leaderIndexMetadata) -> { try { - start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); + start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } @@ -395,25 +401,37 @@ void start( String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, + Map leaderIndexHistoryUUIDs, ActionListener handler) throws IOException { MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; - validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); + validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService); final int numShards = followIndexMetadata.getNumberOfShards(); final AtomicInteger counter = new AtomicInteger(numShards); final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + String recordedLeaderIndexHistoryUUID = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + shardId); ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes, - request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout, - request.idleShardRetryDelay, filteredHeaders); + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.maxBatchOperationCount, + request.maxConcurrentReadBatches, + request.maxOperationSizeInBytes, + request.maxConcurrentWriteBatches, + request.maxWriteBufferSize, + request.retryTimeout, + request.idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + filteredHeaders + ); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override @@ -510,13 +528,27 @@ void finalizeResponse() { static void validate(Request request, IndexMetaData leaderIndex, - IndexMetaData followIndex, MapperService followerMapperService) { + IndexMetaData followIndex, + Map leaderIndexHistoryUUID, + MapperService followerMapperService) { if (leaderIndex == null) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); } if (followIndex == null) { throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist"); } + + Map recordedHistoryUUIDs = convert(followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)); + assert recordedHistoryUUIDs.size() == leaderIndexHistoryUUID.size(); + for (Map.Entry entry : leaderIndexHistoryUUID.entrySet()) { + String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs.get(entry.getKey()); + if (entry.getValue().equals(recordedLeaderIndexHistoryUUID) == false) { + throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + + entry.getValue() + "] as history uuid but instead reference [" + recordedLeaderIndexHistoryUUID + + "] as history uuid"); + } + } + if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled"); } @@ -568,4 +600,16 @@ private static Settings filter(Settings originalSettings) { return settings.build(); } + private static Map convert(Map ccrMetadata) { + Map historyUUIDsByShard = new HashMap<>(); + for (Map.Entry entry : ccrMetadata.entrySet()) { + if (entry.getKey().startsWith(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY)) { + int shardId = Integer.parseInt(entry.getKey().replace(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_", "")); + String previousValue = historyUUIDsByShard.put(shardId, entry.getValue()); + assert previousValue == null; + } + } + return historyUUIDsByShard; + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index b505ee015bab6..c565f8f582feb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -179,6 +179,16 @@ public long getMaxSeqNo() { return maxSeqNo; } + private String historyUUID; + + public String getHistoryUUID() { + return historyUUID; + } + + public void setHistoryUUID(String historyUUID) { + this.historyUUID = historyUUID; + } + private Translog.Operation[] operations; public Translog.Operation[] getOperations() { @@ -188,10 +198,17 @@ public Translog.Operation[] getOperations() { Response() { } - Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { + Response( + final long mappingVersion, + final long globalCheckpoint, + final long maxSeqNo, + final String historyUUID, + final Translog.Operation[] operations) { + this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; + this.historyUUID = historyUUID; this.operations = operations; } @@ -201,6 +218,7 @@ public void readFrom(final StreamInput in) throws IOException { mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); + historyUUID = in.readString(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -210,6 +228,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); + out.writeString(historyUUID); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -221,12 +240,13 @@ public boolean equals(final Object o) { return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && + Objects.equals(historyUUID, that.historyUUID) && Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, historyUUID, Arrays.hashCode(operations)); } } @@ -253,6 +273,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); + final String historyUUID = indexShard.getHistoryUUID(); final Translog.Operation[] operations = getOperations( indexShard, @@ -260,7 +281,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.fromSeqNo, request.maxOperationCount, request.maxOperationSizeInBytes); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), historyUUID, operations); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 00e3aaaae2a8e..92ea1b8fe14ab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -278,6 +278,11 @@ protected void onOperationsFetched(Translog.Operation[] operations) { synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { onOperationsFetched(response.getOperations()); + if (params.getRecordedLeaderIndexHistoryUUID().equals(response.getHistoryUUID()) == false) { + markAsFailed(new IllegalStateException("unexpected history uuid, expected [" + + params.getRecordedLeaderIndexHistoryUUID() + "], actual [" + response.getHistoryUUID() + "]")); + return; + } leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo()); final long newFromSeqNo; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 82482792f3907..4b96da9d923e1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -50,12 +50,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); + (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map) a[15])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -76,6 +77,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -89,11 +91,22 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxWriteBufferSize; private final TimeValue retryTimeout; private final TimeValue idleShardRetryDelay; + private final String recordedLeaderIndexHistoryUUID; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount, - int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches, - int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map headers) { + ShardFollowTask( + String leaderClusterAlias, + ShardId followShardId, + ShardId leaderShardId, + int maxBatchOperationCount, + int maxConcurrentReadBatches, + long maxBatchSizeInBytes, + int maxConcurrentWriteBatches, + int maxWriteBufferSize, + TimeValue retryTimeout, + TimeValue idleShardRetryDelay, + String recordedLeaderIndexHistoryUUID, + Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -104,6 +117,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = maxWriteBufferSize; this.retryTimeout = retryTimeout; this.idleShardRetryDelay = idleShardRetryDelay; + this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -118,6 +132,7 @@ public ShardFollowTask(StreamInput in) throws IOException { this.maxWriteBufferSize = in.readVInt(); this.retryTimeout = in.readTimeValue(); this.idleShardRetryDelay = in.readTimeValue(); + this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -165,6 +180,10 @@ public String getTaskId() { return followShardId.getIndex().getUUID() + "-" + followShardId.getId(); } + public String getRecordedLeaderIndexHistoryUUID() { + return recordedLeaderIndexHistoryUUID; + } + public Map getHeaders() { return headers; } @@ -186,6 +205,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(maxWriteBufferSize); out.writeTimeValue(retryTimeout); out.writeTimeValue(idleShardRetryDelay); + out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -212,6 +232,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -231,13 +252,26 @@ public boolean equals(Object o) { maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(retryTimeout, that.retryTimeout) && Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, - maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers); + return Objects.hash( + leaderClusterAlias, + followShardId, + leaderShardId, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxConcurrentWriteBatches, + maxBatchSizeInBytes, + maxWriteBufferSize, + retryTimeout, + idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + headers + ); } public String toString() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 7980e1281406a..bbbb7f9517bd6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -357,16 +358,11 @@ public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - - final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); - assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); - internalCluster().ensureAtLeastNumDataNodes(2); - ensureGreen("index1", "index2"); + ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); - client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get(); final int numDocs = randomIntBetween(2, 64); for (int i = 0; i < numDocs; i++) { @@ -409,13 +405,13 @@ public void testFollowNonExistentIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("test-follower").get()); // Leader index does not exist. FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); // Follower index does not exist. FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); // Both indices do not exist. FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); } @TestLogging("_root:DEBUG") diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 5b52700f5579b..64081d54c539b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -13,66 +13,87 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.ShardChangesIT; import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.equalTo; public class FollowIndexActionTests extends ESTestCase { + private static final Map CUSTOM_METADATA = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "uuid"); + public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); + Map UUIDs = Collections.singletonMap(0, "uuid"); { // should fail, because leader index does not exist - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { // should fail, because follow index does not exist - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null)); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, null, null, null)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } + { + // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Map customMetaData = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "another-uuid"); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); + assertThat(e.getMessage(), equalTo("follow index [index2] should reference [uuid] as history uuid but " + + "instead reference [another-uuid] as history uuid")); + } { // should fail because leader index does not have soft deletes enabled - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { // should fail because the number of primary shards between leader and follow index are not equal IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); - IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); + IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY, CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { // should fail, because leader index is closed IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader and follow index must be open")); } { // should fail, because leader has a field with the same name mapped as keyword and follower as text IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5, - Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]")); } { @@ -81,39 +102,39 @@ public void testValidation() throws IOException { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical")); } { // should fail because the following index does not have the following_index settings IndexMetaData leaderIMD = createIMD("index1", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); Settings followingIndexSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build(); - IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings); + IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings, CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followingIndexSettings, "index2"); mapperService.updateMapping(null, followIMD); IllegalArgumentException error = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(error.getMessage(), equalTo("the following index [index2] is not ready to follow; " + "the setting [index.xpack.ccr.following_index] must be enabled.")); } { // should succeed IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", 5, Settings.builder() - .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed, index settings are identical @@ -121,15 +142,15 @@ public void testValidation() throws IOException { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed despite whitelisted settings being different @@ -138,25 +159,32 @@ public void testValidation() throws IOException { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, mapperService); } } - private static IndexMetaData createIMD(String index, int numberOfShards, Settings settings) throws IOException { - return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings); + private static IndexMetaData createIMD(String index, + int numberOfShards, + Settings settings, + Map custom) throws IOException { + return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings, custom); } - private static IndexMetaData createIMD(String index, State state, String mapping, int numberOfShards, - Settings settings) throws IOException { + private static IndexMetaData createIMD(String index, + State state, + String mapping, + int numberOfShards, + Settings settings, + Map custom) throws IOException { return IndexMetaData.builder(index) .settings(settings(Version.CURRENT).put(settings)) .numberOfShards(numberOfShards) @@ -164,6 +192,7 @@ private static IndexMetaData createIMD(String index, State state, String mapping .numberOfReplicas(0) .setRoutingNumShards(numberOfShards) .putMapping("_doc", mapping) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, custom) .build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index e9c67097d72b2..b84397654af1f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -20,7 +20,13 @@ protected ShardChangesAction.Response createTestInstance() { for (int i = 0; i < numOps; i++) { operations[i] = new Translog.NoOp(i, 0, "test"); } - return new ShardChangesAction.Response(mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, operations); + return new ShardChangesAction.Response( + mappingVersion, + leaderGlobalCheckpoint, + leaderMaxSeqNo, + randomAlphaOfLength(4), + operations + ); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 9bfd6b9d6ef42..856915903cfcf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -73,10 +73,20 @@ private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testRun) { AtomicBoolean stopped = new AtomicBoolean(false); - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency, - ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + testRun.maxOperationCount, + concurrency, + ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + concurrency, + 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); ThreadPool threadPool = new TestThreadPool(getClass().getSimpleName()); BiConsumer scheduler = (delay, task) -> { @@ -146,7 +156,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0])); + handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, "uuid", new Translog.Operation[0])); } }; threadPool.generic().execute(task); @@ -213,8 +223,17 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } - item.add(new TestResponse(null, mappingVersion, - new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); + item.add(new TestResponse( + null, + mappingVersion, + new ShardChangesAction.Response( + mappingVersion, + nextGlobalCheckPoint, + nextGlobalCheckPoint, + "uuid", + ops.toArray(EMPTY)) + ) + ); responses.put(prevGlobalCheckpoint, item); } else { // Simulates a leader shard copy not having all the operations the shard follow task thinks it has by @@ -230,8 +249,13 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + prevGlobalCheckpoint, + prevGlobalCheckpoint, + "uuid", + EMPTY + ); item.add(new TestResponse(null, mappingVersion, response)); } List ops = new ArrayList<>(); @@ -242,8 +266,13 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, } // Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind: long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + localLeaderGCP, + localLeaderGCP, + "uuid", + ops.toArray(EMPTY) + ); item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 4f7c0bf16645c..11d11f8826d21 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -393,7 +393,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, "uuid", new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -425,7 +425,7 @@ public void testDelayCoordinatesRead() { // Also invokes coordinateReads() task.innerHandleReadResponse(0L, 63L, response); task.innerHandleReadResponse(64L, 63L, - new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0])); + new ShardChangesAction.Response(0, 63L, 63L, "uuid", new Translog.Operation[0])); assertThat(counter[0], equalTo(1)); } @@ -714,9 +714,20 @@ public void testHandleWriteResponse() { ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes, - maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + maxBatchOperationCount, + maxConcurrentReadBatches, + maxBatchSizeInBytes, + maxConcurrentWriteBatches, + bufferWriteLimit, + TimeValue.ZERO, + TimeValue.ZERO, + "uuid", + Collections.emptyMap() + ); shardChangesRequests = new ArrayList<>(); bulkShardOperationRequests = new ArrayList<>(); @@ -777,12 +788,13 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con for (int i = 0; i < requestBatchSize; i++) { operations[i] = new Translog.NoOp(from + i, 0, "test"); } - final ShardChangesAction.Response response = - new ShardChangesAction.Response( - mappingVersions.poll(), - leaderGlobalCheckpoints.poll(), - maxSeqNos.poll(), - operations); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + "uuid", + operations + ); handler.accept(response); } } @@ -814,7 +826,12 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } return new ShardChangesAction.Response( - mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); + mappingVersion, + leaderGlobalCheckPoint, + leaderGlobalCheckPoint, + "uuid", + ops.toArray(new Translog.Operation[0]) + ); } void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 2cd024cb03cf7..73cae2ce8b011 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -159,9 +159,19 @@ private ReplicationGroup createFollowGroup(int replicas) throws IOException { } private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + between(1, 64), + between(1, 8), + Long.MAX_VALUE, + between(1, 4), 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); @@ -212,8 +222,13 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here - final ShardChangesAction.Response response = - new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, + seqNoStats.getGlobalCheckpoint(), + seqNoStats.getMaxSeqNo(), + "uuid", + ops + ); handler.accept(response); return; } catch (Exception e) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 300794a6c00cf..fa11ddf4bf976 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -34,7 +34,9 @@ protected ShardFollowTask createTestInstance() { randomIntBetween(1, Integer.MAX_VALUE), TimeValue.parseTimeValue(randomTimeValue(), ""), TimeValue.parseTimeValue(randomTimeValue(), ""), - randomBoolean() ? null : Collections.singletonMap("key", "value")); + randomAlphaOfLength(4), + randomBoolean() ? null : Collections.singletonMap("key", "value") + ); } @Override From 624b416d43a572b08b353efdcc269f063718ecb6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 9 Sep 2018 08:25:25 +0200 Subject: [PATCH 02/16] renamed method --- .../org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 9 +++++---- .../xpack/ccr/action/CreateAndFollowIndexAction.java | 2 +- .../xpack/ccr/action/FollowIndexAction.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index bd4db1f0aba4e..c09a1cdfbec85 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -68,9 +68,10 @@ public boolean isCcrAllowed() { } /** - * Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for - * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method - * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the + * Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster. Before doing this, + * the remote cluster is checked for license compatibility with CCR. If the remote cluster is not licensed for CCR, + * the {@link ActionListener#onFailure(Exception)} method of the specified listener is invoked. Otherwise, + * the specified consumer is invoked with the leader index metadata fetched from the * remote cluster. * * @param client the client @@ -80,7 +81,7 @@ public boolean isCcrAllowed() { * @param historyUUIDAndLeaderIndexMetadataConsumer the leader index history uuid and the leader index metadata consumer * @param the type of response the listener is waiting for */ - public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( final Client client, final String clusterAlias, final String leaderIndex, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 7f96fa4ea8524..ddae03a28234a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -266,7 +266,7 @@ private void createFollowerIndexAndFollowRemoteIndex( final String clusterAlias, final String leaderIndex, final ActionListener listener) { - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client, clusterAlias, leaderIndex, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index f40ff882c4086..f393a07f79951 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -372,7 +372,7 @@ private void followRemoteIndex( final ActionListener listener) { final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client, clusterAlias, leaderIndex, From 0ed85c517a8e7094167797d8d7ab790df89592c7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 9 Sep 2018 08:43:07 +0200 Subject: [PATCH 03/16] added unit test --- .../ccr/action/ShardFollowNodeTaskTests.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 11d11f8826d21..45f84ae854119 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -711,6 +712,25 @@ public void testHandleWriteResponse() { assertThat(status.followerGlobalCheckpoint(), equalTo(63L)); } + public void testHistoryUUIDChanged() { + ShardFollowNodeTask task = createShardFollowTask(1, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 1, -1); + + task.coordinateReads(); + assertThat(shardChangesRequests.size(), equalTo(1)); + assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); + assertThat(shardChangesRequests.get(0)[1], equalTo(1L)); + + ShardChangesAction.Response response = new ShardChangesAction.Response(0, 1, 1, "another-uuid", new Translog.Operation[] { + new Translog.Index("doc", "1", 0, 0, "{}".getBytes(StandardCharsets.UTF_8)) + }); + task.innerHandleReadResponse(0L, 63L, response); + assertThat(task.isStopped(), is(true)); + assertThat(fatalError, notNullValue()); + assertThat(fatalError, instanceOf(IllegalStateException.class)); + assertThat(fatalError.getMessage(), equalTo("unexpected history uuid, expected [uuid], actual [another-uuid]")); + } + ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); From 0685ee4d2d7759cc34a941393f60de2ade6526ce Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 9 Sep 2018 09:28:48 +0200 Subject: [PATCH 04/16] isolated logic that fetches history UUIDs and moved it to CcrLicenseChecker to avoid duplication of logic --- .../xpack/ccr/CcrLicenseChecker.java | 54 +++++++++++++------ .../action/CreateAndFollowIndexAction.java | 33 +----------- .../xpack/ccr/action/FollowIndexAction.java | 6 +-- 3 files changed, 41 insertions(+), 52 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index c09a1cdfbec85..9cf39d8ed0795 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; /** * Encapsulates licensing checking for CCR. @@ -105,23 +106,9 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe final ClusterState remoteClusterState = r.getState(); final IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); - CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { - IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); - Map historyUUIDs = new HashMap<>(); - for (IndexShardStats indexShardStats : indexStats) { - for (ShardStats shardStats : indexShardStats) { - CommitStats commitStats = shardStats.getCommitStats(); - String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); - ShardId shardId = shardStats.getShardRouting().shardId(); - historyUUIDs.put(shardId.id(), historyUUID); - } - } + fetchLeaderHistoryUUIDs(remoteClient, leaderIndex, listener, historyUUIDs -> { historyUUIDAndLeaderIndexMetadataConsumer.accept(historyUUIDs, leaderIndexMetadata); - }; - IndicesStatsRequest request = new IndicesStatsRequest(); - request.indices(leaderIndex); - remoteClient.admin().indices().stats(request, - ActionListener.wrap(indicesStatsHandler, listener::onFailure)); + }); }, listener::onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata @@ -139,6 +126,41 @@ public void onFailure(final Exception e) { }); } + /** + * Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient. + * + * @param leaderClient the leader client + * @param leaderIndex the name of the leader index + * @param listener the listener + * @param historyUUIDConsumer the leader index history uuid and consumer + * @param the type of response the listener is waiting for + */ + // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs + // in case of following a local or a remote cluster. + public void fetchLeaderHistoryUUIDs( + final Client leaderClient, + final String leaderIndex, + final ActionListener listener, + final Consumer> historyUUIDConsumer) { + + CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + Map historyUUIDs = new HashMap<>(); + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + CommitStats commitStats = shardStats.getCommitStats(); + String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs.put(shardId.id(), historyUUID); + } + } + historyUUIDConsumer.accept(historyUUIDs); + }; + IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(leaderIndex); + leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, listener::onFailure)); + } + private static ElasticsearchStatusException incompatibleRemoteLicense( final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index ddae03a28234a..c938aaccdb44f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -12,11 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -34,7 +29,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -42,8 +36,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -258,7 +250,7 @@ private void createFollowerIndexAndFollowLocalIndex( Consumer> handler = historyUUID -> { createFollowerIndex(leaderIndexMetadata, historyUUID, request, listener); }; - fetchHistoryUUID(client, leaderIndex, handler, listener::onFailure); + ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndex, listener, handler); } private void createFollowerIndexAndFollowRemoteIndex( @@ -375,29 +367,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); } - // would be great if can reuse some of the logic in CcrLicenseChecker to do remote calls for - // fetching leader index metadata and leader index uuid - static void fetchHistoryUUID(final Client client, - final String leaderIndex, - final Consumer> handler, - final Consumer errorHandler) { - IndicesStatsRequest request = new IndicesStatsRequest(); - request.indices(leaderIndex); - CheckedConsumer onResponseHandler = indicesStatsResponse -> { - IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); - Map historyUUIDs = new HashMap<>(); - for (IndexShardStats indexShardStats : indexStats) { - for (ShardStats shardStats : indexShardStats) { - String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); - ShardId shardId = shardStats.getShardRouting().shardId(); - historyUUIDs.put(shardId.id(), historyUUID); - } - } - handler.accept(historyUUIDs); - }; - client.admin().indices().stats(request, ActionListener.wrap(onResponseHandler, errorHandler)); - } - } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index f393a07f79951..4e4fb26d0efd3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -64,8 +64,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction.TransportAction.fetchHistoryUUID; - public class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); @@ -356,13 +354,13 @@ private void followLocalIndex(final Request request, final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); // following an index in local cluster, so use local cluster state to fetch leader index metadata final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); - fetchHistoryUUID(client, request.getLeaderIndex(), historyUUIDs -> { + ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, request.getLeaderIndex(), listener, historyUUIDs -> { try { start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); } catch (final IOException e) { listener.onFailure(e); } - }, listener::onFailure); + }); } private void followRemoteIndex( From 96edbdeb4c5103728bd5744b216fbbb8fa653d3c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 9 Sep 2018 16:36:59 +0200 Subject: [PATCH 05/16] iter --- .../ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 3 ++- .../xpack/ccr/action/CreateAndFollowIndexAction.java | 6 +++--- .../elasticsearch/xpack/ccr/action/FollowIndexAction.java | 6 +++--- .../xpack/ccr/action/FollowIndexActionTests.java | 4 ++-- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index e2b8491431891..6827badaa4353 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -83,7 +83,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; - public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY = "leader_index_history_uuid"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS = "leader_index_shard_history_uuids"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index f672be4c348cf..ea4f849c46981 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -212,7 +212,8 @@ public void fetchLeaderHistoryUUIDs( CommitStats commitStats = shardStats.getCommitStats(); String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); ShardId shardId = shardStats.getShardRouting().shardId(); - historyUUIDs.put(shardId.id(), historyUUID); + Object previousValue = historyUUIDs.put(shardId.id(), historyUUID); + assert previousValue == null; } } historyUUIDConsumer.accept(historyUUIDs); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 412ae19444da1..0b368319b90da 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -247,8 +247,8 @@ private void createFollowerIndexAndFollowLocalIndex( // following an index in local cluster, so use local cluster state to fetch leader index metadata final String leaderIndex = request.getFollowRequest().getLeaderIndex(); final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); - Consumer> handler = historyUUID -> { - createFollowerIndex(leaderIndexMetadata, historyUUID, request, listener); + Consumer> handler = historyUUIDs -> { + createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener); }; ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndex, listener::onFailure, handler); } @@ -309,7 +309,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { // Adding the leader index uuid for each shard as custom metadata: Map metadata = new HashMap<>(); for (Map.Entry entry : historyUUIDs.entrySet()) { - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + entry.getKey(), entry.getValue()); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_" + entry.getKey(), entry.getValue()); } imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index ed23a9c0f2376..03d0efec7a20f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -415,7 +415,7 @@ void start( final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; String recordedLeaderIndexHistoryUUID = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) - .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_" + shardId); + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_" + shardId); ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), @@ -601,8 +601,8 @@ private static Settings filter(Settings originalSettings) { private static Map convert(Map ccrMetadata) { Map historyUUIDsByShard = new HashMap<>(); for (Map.Entry entry : ccrMetadata.entrySet()) { - if (entry.getKey().startsWith(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY)) { - int shardId = Integer.parseInt(entry.getKey().replace(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_", "")); + if (entry.getKey().startsWith(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS)) { + int shardId = Integer.parseInt(entry.getKey().replace(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_", "")); String previousValue = historyUUIDsByShard.put(shardId, entry.getValue()); assert previousValue == null; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 64081d54c539b..b1a8d811e5e3d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -28,7 +28,7 @@ public class FollowIndexActionTests extends ESTestCase { private static final Map CUSTOM_METADATA = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_0", "uuid"); public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); @@ -49,7 +49,7 @@ public void testValidation() throws IOException { // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); Map customMetaData = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_HISTORY_UUID_KEY + "_0", "another-uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_0", "another-uuid"); IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); From b9ae5cea38b863223ec441be5938e448a02326ea Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Sep 2018 20:50:18 +0200 Subject: [PATCH 06/16] moved history uuid validation to internal shard changes api --- .../xpack/ccr/action/ShardChangesAction.java | 46 ++++++++++--------- .../xpack/ccr/action/ShardFollowNodeTask.java | 5 -- .../ccr/action/ShardFollowTasksExecutor.java | 3 +- .../xpack/ccr/ShardChangesIT.java | 6 ++- .../ccr/action/ShardChangesActionTests.java | 23 ++++++---- .../ccr/action/ShardChangesRequestTests.java | 5 +- .../ccr/action/ShardChangesResponseTests.java | 1 - .../ShardFollowNodeTaskRandomTests.java | 5 +- .../ccr/action/ShardFollowNodeTaskTests.java | 26 +---------- .../ShardFollowTaskReplicationTests.java | 5 +- 10 files changed, 53 insertions(+), 72 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index c565f8f582feb..e42f2db2bb3e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -57,11 +57,13 @@ public static class Request extends SingleShardRequest { private long fromSeqNo; private int maxOperationCount; private ShardId shardId; + private String expectedHistoryUUID; private long maxOperationSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; - public Request(ShardId shardId) { + public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); this.shardId = shardId; + this.expectedHistoryUUID = expectedHistoryUUID; } Request() { @@ -95,6 +97,10 @@ public void setMaxOperationSizeInBytes(long maxOperationSizeInBytes) { this.maxOperationSizeInBytes = maxOperationSizeInBytes; } + public String getExpectedHistoryUUID() { + return expectedHistoryUUID; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -118,6 +124,7 @@ public void readFrom(StreamInput in) throws IOException { fromSeqNo = in.readVLong(); maxOperationCount = in.readVInt(); shardId = ShardId.readShardId(in); + expectedHistoryUUID = in.readString(); maxOperationSizeInBytes = in.readVLong(); } @@ -127,6 +134,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(fromSeqNo); out.writeVInt(maxOperationCount); shardId.writeTo(out); + out.writeString(expectedHistoryUUID); out.writeVLong(maxOperationSizeInBytes); } @@ -139,12 +147,13 @@ public boolean equals(final Object o) { return fromSeqNo == request.fromSeqNo && maxOperationCount == request.maxOperationCount && Objects.equals(shardId, request.shardId) && + Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) && maxOperationSizeInBytes == request.maxOperationSizeInBytes; } @Override public int hashCode() { - return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); + return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes); } @Override @@ -153,6 +162,7 @@ public String toString() { "fromSeqNo=" + fromSeqNo + ", maxOperationCount=" + maxOperationCount + ", shardId=" + shardId + + ", expectedHistoryUUID=" + expectedHistoryUUID + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + '}'; } @@ -179,16 +189,6 @@ public long getMaxSeqNo() { return maxSeqNo; } - private String historyUUID; - - public String getHistoryUUID() { - return historyUUID; - } - - public void setHistoryUUID(String historyUUID) { - this.historyUUID = historyUUID; - } - private Translog.Operation[] operations; public Translog.Operation[] getOperations() { @@ -202,13 +202,11 @@ public Translog.Operation[] getOperations() { final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, - final String historyUUID, final Translog.Operation[] operations) { this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; - this.historyUUID = historyUUID; this.operations = operations; } @@ -218,7 +216,6 @@ public void readFrom(final StreamInput in) throws IOException { mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); - historyUUID = in.readString(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -228,7 +225,6 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); - out.writeString(historyUUID); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -240,13 +236,12 @@ public boolean equals(final Object o) { return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && - Objects.equals(historyUUID, that.historyUUID) && Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, historyUUID, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); } } @@ -273,15 +268,15 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); - final String historyUUID = indexShard.getHistoryUUID(); final Translog.Operation[] operations = getOperations( indexShard, seqNoStats.getGlobalCheckpoint(), request.fromSeqNo, request.maxOperationCount, + request.expectedHistoryUUID, request.maxOperationSizeInBytes); - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), historyUUID, operations); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); } @Override @@ -313,11 +308,20 @@ protected Response newResponse() { * Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method * stops collecting more operations and returns what has been collected so far. */ - static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount, + static Translog.Operation[] getOperations(IndexShard indexShard, + long globalCheckpoint, + long fromSeqNo, + int maxOperationCount, + String expectedHistoryUUID, long maxOperationSizeInBytes) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } + final String historyUUID = indexShard.getHistoryUUID(); + if (historyUUID.equals(expectedHistoryUUID) == false) { + throw new IllegalStateException("unexpected history uuid, expected [" + historyUUID + "], actual [" + + expectedHistoryUUID + "]"); + } if (fromSeqNo > indexShard.getGlobalCheckpoint()) { return EMPTY_OPERATIONS_ARRAY; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 92ea1b8fe14ab..00e3aaaae2a8e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -278,11 +278,6 @@ protected void onOperationsFetched(Translog.Operation[] operations) { synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { onOperationsFetched(response.getOperations()); - if (params.getRecordedLeaderIndexHistoryUUID().equals(response.getHistoryUUID()) == false) { - markAsFailed(new IllegalStateException("unexpected history uuid, expected [" + - params.getRecordedLeaderIndexHistoryUUID() + "], actual [" + response.getHistoryUUID() + "]")); - return; - } leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo()); final long newFromSeqNo; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 83e3e4806e184..7b63e73ee59fa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -133,7 +133,8 @@ protected void innerSendBulkShardOperationsRequest( @Override protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler) { - ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId()); + ShardChangesAction.Request request = + new ShardChangesAction.Request(params.getLeaderShardId(), params.getRecordedLeaderIndexHistoryUUID()); request.setFromSeqNo(from); request.setMaxOperationCount(maxOperationCount); request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index bbbb7f9517bd6..6b5d356588a70 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -117,7 +118,8 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); assertThat(globalCheckPoint, equalTo(2L)); - ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID); request.setFromSeqNo(0L); request.setMaxOperationCount(3); ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); @@ -142,7 +144,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); assertThat(globalCheckPoint, equalTo(5L)); - request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID); request.setFromSeqNo(3L); request.setMaxOperationCount(3); response = client().execute(ShardChangesAction.INSTANCE, request).get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 430e9cb48b1a8..946fa331876cf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -59,22 +59,27 @@ public void testGetOperations() throws Exception { int max = randomIntBetween(min, numWrites - 1); int size = max - min + 1; final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, - indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE); + indexShard.getGlobalCheckpoint(), min, size, indexShard.getHistoryUUID(), Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); final List expectedSeqNos = LongStream.rangeClosed(min, max).boxed().collect(Collectors.toList()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } - // get operations for a range no operations exists: Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites, numWrites + 1, Long.MAX_VALUE); + numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(0)); // get operations for a range some operations do not exist: operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites - 10, numWrites + 10, Long.MAX_VALUE); + numWrites - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(10)); + + // Unexpected history UUID: + Exception e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperations(indexShard, + indexShard.getGlobalCheckpoint(), 0, 10, "different-history-uuid", Long.MAX_VALUE)); + assertThat(e.getMessage(), equalTo("unexpected history uuid, expected [" + indexShard.getHistoryUUID() + + "], actual [different-history-uuid]")); } public void testGetOperationsWhenShardNotStarted() throws Exception { @@ -83,7 +88,7 @@ public void testGetOperationsWhenShardNotStarted() throws Exception { ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperations(indexShard, - indexShard.getGlobalCheckpoint(), 0, 1, Long.MAX_VALUE)); + indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), Long.MAX_VALUE)); } public void testGetOperationsExceedByteLimit() throws Exception { @@ -100,7 +105,7 @@ public void testGetOperationsExceedByteLimit() throws Exception { final IndexShard indexShard = indexService.getShard(0); final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - 0, 12, 256); + 0, 12, indexShard.getHistoryUUID(), 256); assertThat(operations.length, equalTo(12)); assertThat(operations[0].seqNo(), equalTo(0L)); assertThat(operations[1].seqNo(), equalTo(1L)); @@ -127,7 +132,7 @@ public void testGetOperationsAlwaysReturnAtLeastOneOp() throws Exception { final IndexShard indexShard = indexService.getShard(0); final Translog.Operation[] operations = - ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, 0); + ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), 0); assertThat(operations.length, equalTo(1)); assertThat(operations[0].seqNo(), equalTo(0L)); } @@ -137,7 +142,7 @@ public void testIndexNotFound() throws InterruptedException { final AtomicReference reference = new AtomicReference<>(); final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); transportAction.execute( - new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)), + new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0), "uuid"), new ActionListener() { @Override public void onResponse(final ShardChangesAction.Response response) { @@ -162,7 +167,7 @@ public void testShardNotFound() throws InterruptedException { final AtomicReference reference = new AtomicReference<>(); final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); transportAction.execute( - new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)), + new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards), "uuid"), new ActionListener() { @Override public void onResponse(final ShardChangesAction.Response response) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java index 19585da8851d6..2ea2086990b32 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -15,7 +15,8 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, "uuid", new Translog.Operation[0])); + handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0])); } }; threadPool.generic().execute(task); @@ -230,7 +230,6 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, - "uuid", ops.toArray(EMPTY)) ) ); @@ -253,7 +252,6 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, - "uuid", EMPTY ); item.add(new TestResponse(null, mappingVersion, response)); @@ -270,7 +268,6 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, localLeaderGCP, localLeaderGCP, - "uuid", ops.toArray(EMPTY) ); item.add(new TestResponse(null, mappingVersion, response)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 45f84ae854119..5f0832c2eeccb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -34,7 +34,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -394,7 +393,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, "uuid", new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -426,7 +425,7 @@ public void testDelayCoordinatesRead() { // Also invokes coordinateReads() task.innerHandleReadResponse(0L, 63L, response); task.innerHandleReadResponse(64L, 63L, - new ShardChangesAction.Response(0, 63L, 63L, "uuid", new Translog.Operation[0])); + new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0])); assertThat(counter[0], equalTo(1)); } @@ -712,25 +711,6 @@ public void testHandleWriteResponse() { assertThat(status.followerGlobalCheckpoint(), equalTo(63L)); } - public void testHistoryUUIDChanged() { - ShardFollowNodeTask task = createShardFollowTask(1, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 1, -1); - - task.coordinateReads(); - assertThat(shardChangesRequests.size(), equalTo(1)); - assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); - assertThat(shardChangesRequests.get(0)[1], equalTo(1L)); - - ShardChangesAction.Response response = new ShardChangesAction.Response(0, 1, 1, "another-uuid", new Translog.Operation[] { - new Translog.Index("doc", "1", 0, 0, "{}".getBytes(StandardCharsets.UTF_8)) - }); - task.innerHandleReadResponse(0L, 63L, response); - assertThat(task.isStopped(), is(true)); - assertThat(fatalError, notNullValue()); - assertThat(fatalError, instanceOf(IllegalStateException.class)); - assertThat(fatalError.getMessage(), equalTo("unexpected history uuid, expected [uuid], actual [another-uuid]")); - } - ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); @@ -812,7 +792,6 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con mappingVersions.poll(), leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), - "uuid", operations ); handler.accept(response); @@ -849,7 +828,6 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, - "uuid", ops.toArray(new Translog.Operation[0]) ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 73cae2ce8b011..bbf0dbc3244e5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -169,7 +169,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, between(1, 4), 10240, TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), - "uuid", + leaderGroup.getPrimary().getHistoryUUID(), Collections.emptyMap() ); @@ -220,13 +220,12 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co try { final SeqNoStats seqNoStats = indexShard.seqNoStats(); Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, - maxOperationCount, params.getMaxBatchSizeInBytes()); + maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response( 1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), - "uuid", ops ); handler.accept(response); From ee475a040ee2eca24bf262078a545b92bd3a3343 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Sep 2018 22:14:57 +0200 Subject: [PATCH 07/16] keep recorded index shard uuids as string[] around --- .../xpack/ccr/CcrLicenseChecker.java | 20 ++++---- .../action/CreateAndFollowIndexAction.java | 10 ++-- .../xpack/ccr/action/FollowIndexAction.java | 47 +++++++++---------- .../ccr/action/FollowIndexActionTests.java | 11 ++--- 4 files changed, 41 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index ea4f849c46981..6ef48523fa68f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -28,9 +28,7 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; -import java.util.HashMap; import java.util.Locale; -import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; @@ -87,7 +85,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU final String clusterAlias, final String leaderIndex, final Consumer onFailure, - final BiConsumer, IndexMetaData> consumer) { + final BiConsumer consumer) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -99,9 +97,9 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU request, onFailure, leaderClusterState -> { + IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); final Client leaderClient = client.getRemoteClusterClient(clusterAlias); - fetchLeaderHistoryUUIDs(leaderClient, leaderIndex, onFailure, historyUUIDs -> { - IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); + fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> { consumer.accept(historyUUIDs, leaderIndexMetaData); }); }, @@ -191,7 +189,7 @@ public void onFailure(final Exception e) { * Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient. * * @param leaderClient the leader client - * @param leaderIndex the name of the leader index + * @param leaderIndexMetaData the leader index metadata * @param onFailure the failure consumer * @param historyUUIDConsumer the leader index history uuid and consumer * @param the type of response the listener is waiting for @@ -200,20 +198,20 @@ public void onFailure(final Exception e) { // in case of following a local or a remote cluster. public void fetchLeaderHistoryUUIDs( final Client leaderClient, - final String leaderIndex, + final IndexMetaData leaderIndexMetaData, final Consumer onFailure, - final Consumer> historyUUIDConsumer) { + final Consumer historyUUIDConsumer) { + String leaderIndex = leaderIndexMetaData.getIndex().getName(); CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); - Map historyUUIDs = new HashMap<>(); + String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()]; for (IndexShardStats indexShardStats : indexStats) { for (ShardStats shardStats : indexShardStats) { CommitStats commitStats = shardStats.getCommitStats(); String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); ShardId shardId = shardStats.getShardRouting().shardId(); - Object previousValue = historyUUIDs.put(shardId.id(), historyUUID); - assert previousValue == null; + historyUUIDs[shardId.id()] = historyUUID; } } historyUUIDConsumer.accept(historyUUIDs); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 0b368319b90da..ef180433e428a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -247,10 +247,10 @@ private void createFollowerIndexAndFollowLocalIndex( // following an index in local cluster, so use local cluster state to fetch leader index metadata final String leaderIndex = request.getFollowRequest().getLeaderIndex(); final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); - Consumer> handler = historyUUIDs -> { + Consumer handler = historyUUIDs -> { createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener); }; - ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndex, listener::onFailure, handler); + ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, handler); } private void createFollowerIndexAndFollowRemoteIndex( @@ -268,7 +268,7 @@ private void createFollowerIndexAndFollowRemoteIndex( private void createFollowerIndex( final IndexMetaData leaderIndexMetaData, - final Map historyUUIDs, + final String[] historyUUIDs, final Request request, final ActionListener listener) { if (leaderIndexMetaData == null) { @@ -308,9 +308,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { // Adding the leader index uuid for each shard as custom metadata: Map metadata = new HashMap<>(); - for (Map.Entry entry : historyUUIDs.entrySet()) { - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_" + entry.getKey(), entry.getValue()); - } + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, String.join(",", historyUUIDs)); imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); // Copy all settings, but overwrite a few settings. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 03d0efec7a20f..61fbd47b9f7e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.SearchSlowLog; @@ -53,7 +54,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -354,7 +354,11 @@ private void followLocalIndex(final Request request, final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); // following an index in local cluster, so use local cluster state to fetch leader index metadata final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); - ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, request.getLeaderIndex(), listener::onFailure, historyUUIDs -> { + if (leaderIndexMetadata == null) { + throw new IndexNotFoundException(request.getFollowerIndex()); + } + + ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> { try { start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); } catch (final IOException e) { @@ -399,7 +403,7 @@ void start( String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, - Map leaderIndexHistoryUUIDs, + String[] leaderIndexHistoryUUIDs, ActionListener handler) throws IOException { MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; @@ -414,8 +418,8 @@ void start( for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - String recordedLeaderIndexHistoryUUID = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) - .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_" + shardId); + String[] recordedLeaderShardHistoryUUIDs = convert(followIndexMetadata); + String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), @@ -427,7 +431,7 @@ void start( request.maxWriteBufferSize, request.retryTimeout, request.idleShardRetryDelay, - recordedLeaderIndexHistoryUUID, + recordedLeaderShardHistoryUUID, filteredHeaders ); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, @@ -527,7 +531,7 @@ void finalizeResponse() { static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, - Map leaderIndexHistoryUUID, + String[] leaderIndexHistoryUUID, MapperService followerMapperService) { if (leaderIndex == null) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); @@ -536,14 +540,15 @@ static void validate(Request request, throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist"); } - Map recordedHistoryUUIDs = convert(followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)); - assert recordedHistoryUUIDs.size() == leaderIndexHistoryUUID.size(); - for (Map.Entry entry : leaderIndexHistoryUUID.entrySet()) { - String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs.get(entry.getKey()); - if (entry.getValue().equals(recordedLeaderIndexHistoryUUID) == false) { + String[] recordedHistoryUUIDs = convert(followIndex); + assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length; + for (int i = 0; i < leaderIndexHistoryUUID.length; i++) { + String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i]; + String actualLeaderIndexHistoryUUID = leaderIndexHistoryUUID[i]; + if (recordedLeaderIndexHistoryUUID.equals(actualLeaderIndexHistoryUUID) == false) { throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + - entry.getValue() + "] as history uuid but instead reference [" + recordedLeaderIndexHistoryUUID + - "] as history uuid"); + recordedLeaderIndexHistoryUUID + "] as history uuid but instead reference [" + + actualLeaderIndexHistoryUUID + "] as history uuid"); } } @@ -598,16 +603,10 @@ private static Settings filter(Settings originalSettings) { return settings.build(); } - private static Map convert(Map ccrMetadata) { - Map historyUUIDsByShard = new HashMap<>(); - for (Map.Entry entry : ccrMetadata.entrySet()) { - if (entry.getKey().startsWith(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS)) { - int shardId = Integer.parseInt(entry.getKey().replace(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_", "")); - String previousValue = historyUUIDsByShard.put(shardId, entry.getValue()); - assert previousValue == null; - } - } - return historyUUIDsByShard; + private static String[] convert(IndexMetaData followIndexMetadata) { + String historyUUIDs = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS); + return historyUUIDs.split(","); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index b1a8d811e5e3d..32a73d95d1b3c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.xpack.ccr.ShardChangesIT; import java.io.IOException; -import java.util.Collections; import java.util.Map; import static java.util.Collections.emptyMap; @@ -28,11 +27,11 @@ public class FollowIndexActionTests extends ESTestCase { private static final Map CUSTOM_METADATA = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_0", "uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, "uuid"); public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); - Map UUIDs = Collections.singletonMap(0, "uuid"); + String[] UUIDs = new String[]{"uuid"}; { // should fail, because leader index does not exist Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null, null)); @@ -49,12 +48,12 @@ public void testValidation() throws IOException { // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); Map customMetaData = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS + "_0", "another-uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, "another-uuid"); IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); - assertThat(e.getMessage(), equalTo("follow index [index2] should reference [uuid] as history uuid but " + - "instead reference [another-uuid] as history uuid")); + assertThat(e.getMessage(), equalTo("follow index [index2] should reference [another-uuid] as history uuid but " + + "instead reference [uuid] as history uuid")); } { // should fail because leader index does not have soft deletes enabled From 9df2ac66bae4e86143dad5bfd28454810c3a35fc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Sep 2018 22:16:52 +0200 Subject: [PATCH 08/16] iter --- .../src/main/java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 3 +-- .../xpack/ccr/action/CreateAndFollowIndexAction.java | 2 +- .../elasticsearch/xpack/ccr/action/FollowIndexAction.java | 8 ++++---- .../xpack/ccr/action/FollowIndexActionTests.java | 4 ++-- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 6827badaa4353..7a5d60788c025 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -83,7 +83,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; - public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS = "leader_index_shard_history_uuids"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 6ef48523fa68f..95bb1120b5bd8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -192,11 +192,10 @@ public void onFailure(final Exception e) { * @param leaderIndexMetaData the leader index metadata * @param onFailure the failure consumer * @param historyUUIDConsumer the leader index history uuid and consumer - * @param the type of response the listener is waiting for */ // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs // in case of following a local or a remote cluster. - public void fetchLeaderHistoryUUIDs( + public void fetchLeaderHistoryUUIDs( final Client leaderClient, final IndexMetaData leaderIndexMetaData, final Consumer onFailure, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index ef180433e428a..03c9249ef23e7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -308,7 +308,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { // Adding the leader index uuid for each shard as custom metadata: Map metadata = new HashMap<>(); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, String.join(",", historyUUIDs)); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); // Copy all settings, but overwrite a few settings. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 61fbd47b9f7e0..6e4c08eed27ef 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -418,7 +418,7 @@ void start( for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - String[] recordedLeaderShardHistoryUUIDs = convert(followIndexMetadata); + String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndexMetadata); String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, @@ -540,7 +540,7 @@ static void validate(Request request, throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist"); } - String[] recordedHistoryUUIDs = convert(followIndex); + String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndex); assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length; for (int i = 0; i < leaderIndexHistoryUUID.length; i++) { String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i]; @@ -603,9 +603,9 @@ private static Settings filter(Settings originalSettings) { return settings.build(); } - private static String[] convert(IndexMetaData followIndexMetadata) { + private static String[] extractIndexShardHistoryUUIDs(IndexMetaData followIndexMetadata) { String historyUUIDs = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) - .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS); + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS); return historyUUIDs.split(","); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index 32a73d95d1b3c..c16f40072f79a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -27,7 +27,7 @@ public class FollowIndexActionTests extends ESTestCase { private static final Map CUSTOM_METADATA = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, "uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid"); public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); @@ -48,7 +48,7 @@ public void testValidation() throws IOException { // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); Map customMetaData = - singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARDS_HISTORY_UUIDS, "another-uuid"); + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "another-uuid"); IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); From e7e0f149593d156f46113bd3d9f273e9330b33e4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Sep 2018 22:27:49 +0200 Subject: [PATCH 09/16] fixed test after merge --- .../java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index d8357a74e8ebc..5651657736c39 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -110,7 +110,7 @@ public void testFollowIndex() throws Exception { e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); - assertThat(e.getMessage(), containsString("follow index [" + unallowedIndex + "] does not exist")); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); assertThat(indexExists(adminClient(), unallowedIndex), is(false)); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); } From 43fbbee7ae8c26af0764c0edf7f9e48c94b81659 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 11 Sep 2018 11:47:06 +0200 Subject: [PATCH 10/16] iter --- .../xpack/ccr/CcrLicenseChecker.java | 24 ++++++++++++++----- .../xpack/ccr/action/FollowIndexAction.java | 6 ++--- .../ccr/action/FollowIndexActionTests.java | 2 +- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 95bb1120b5bd8..d6f490aa2fbb0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -27,7 +27,9 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.XPackPlugin; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Locale; import java.util.Objects; import java.util.function.BiConsumer; @@ -73,12 +75,12 @@ public boolean isCcrAllowed() { * If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise, * the specified consumer is invoked with the leader index metadata fetched from the remote cluster. * - * @param client the client - * @param clusterAlias the remote cluster alias - * @param leaderIndex the name of the leader index - * @param onFailure the failure consumer - * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards - * @param the type of response the listener is waiting for + * @param client the client + * @param clusterAlias the remote cluster alias + * @param leaderIndex the name of the leader index + * @param onFailure the failure consumer + * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards + * @param the type of response the listener is waiting for */ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( final Client client, @@ -208,14 +210,24 @@ public void fetchLeaderHistoryUUIDs( for (IndexShardStats indexShardStats : indexStats) { for (ShardStats shardStats : indexShardStats) { CommitStats commitStats = shardStats.getCommitStats(); + if (commitStats == null) { + onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing")); + return; + } String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + if (historyUUID == null) { + onFailure.accept(new IllegalArgumentException("leader index does not have an history uuid")); + return; + } ShardId shardId = shardStats.getShardRouting().shardId(); historyUUIDs[shardId.id()] = historyUUID; } } + assert new HashSet<>(Arrays.asList(historyUUIDs)).size() == leaderIndexMetaData.getNumberOfShards(); historyUUIDConsumer.accept(historyUUIDs); }; IndicesStatsRequest request = new IndicesStatsRequest(); + request.clear(); request.indices(leaderIndex); leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 7dcf7648c2340..224bc5c521c30 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -546,9 +546,9 @@ static void validate(Request request, String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i]; String actualLeaderIndexHistoryUUID = leaderIndexHistoryUUID[i]; if (recordedLeaderIndexHistoryUUID.equals(actualLeaderIndexHistoryUUID) == false) { - throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + - recordedLeaderIndexHistoryUUID + "] as history uuid but instead reference [" + - actualLeaderIndexHistoryUUID + "] as history uuid"); + throw new IllegalArgumentException("leader shard [" + request.followerIndex + "][" + i + "] should reference [" + + recordedLeaderIndexHistoryUUID + "] as history uuid but instead reference [" + actualLeaderIndexHistoryUUID + + "] as history uuid"); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index c16f40072f79a..a9c622e42fa7e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -52,7 +52,7 @@ public void testValidation() throws IOException { IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, followIMD, UUIDs, null)); - assertThat(e.getMessage(), equalTo("follow index [index2] should reference [another-uuid] as history uuid but " + + assertThat(e.getMessage(), equalTo("leader shard [index2][0] should reference [another-uuid] as history uuid but " + "instead reference [uuid] as history uuid")); } { From 9620d7bf1a40dd177d316f896b0b397be4a69bef Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 11 Sep 2018 11:48:57 +0200 Subject: [PATCH 11/16] added test that verifies that we stop and fail the shard follow task in case of history uuid changed. --- .../ESIndexLevelReplicationTestCase.java | 4 ++ .../xpack/ccr/action/ShardChangesAction.java | 4 +- .../ShardFollowTaskReplicationTests.java | 53 ++++++++++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 8717d7ba146fb..5f0909db0d3f0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -443,6 +443,10 @@ public IndexShard getPrimary() { return primary; } + public synchronized void reinitPrimaryShard() throws IOException { + primary = reinitShard(primary); + } + public void syncGlobalCheckpoint() { PlainActionFuture listener = new PlainActionFuture<>(); try { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index fb86a7ee9416f..fa051abc4e4b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -319,8 +319,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, } final String historyUUID = indexShard.getHistoryUUID(); if (historyUUID.equals(expectedHistoryUUID) == false) { - throw new IllegalStateException("unexpected history uuid, expected [" + historyUUID + "], actual [" + - expectedHistoryUUID + "]"); + throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + + historyUUID + "]"); } if (fromSeqNo > globalCheckpoint) { return EMPTY_OPERATIONS_ARRAY; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index bbf0dbc3244e5..2ed397d13138a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -14,7 +14,10 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -25,6 +28,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -38,11 +42,15 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { @@ -129,6 +137,43 @@ public void testFailLeaderReplicaShard() throws Exception { } } + public void testChangeHistoryUUID() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(0); + ReplicationGroup followerGroup = createFollowGroup(0)) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + leaderGroup.reinitPrimaryShard(); + leaderGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(leaderGroup.getPrimary()); + String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "]")); + }); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) @@ -167,7 +212,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, - TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(100), TimeValue.timeValueMillis(10), leaderGroup.getPrimary().getHistoryUUID(), Collections.emptyMap() @@ -175,6 +220,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); + AtomicReference failureHolder = new AtomicReference<>(); LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -252,9 +298,14 @@ public void markAsCompleted() { @Override public void markAsFailed(Exception e) { + failureHolder.set(e); stopped.set(true); } + @Override + public Exception getFailure() { + return failureHolder.get(); + } }; } From 8fee5ba4407e213a953836c1c13ef4ce15831c7d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 11 Sep 2018 12:06:20 +0200 Subject: [PATCH 12/16] fixed checkstyle violations and fixed error message assertions --- .../xpack/ccr/action/ShardChangesActionTests.java | 4 ++-- .../xpack/ccr/action/ShardFollowTaskReplicationTests.java | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 946fa331876cf..b973fbac3ce3e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -78,8 +78,8 @@ public void testGetOperations() throws Exception { // Unexpected history UUID: Exception e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 10, "different-history-uuid", Long.MAX_VALUE)); - assertThat(e.getMessage(), equalTo("unexpected history uuid, expected [" + indexShard.getHistoryUUID() + - "], actual [different-history-uuid]")); + assertThat(e.getMessage(), equalTo("unexpected history uuid, expected [different-history-uuid], actual [" + + indexShard.getHistoryUUID() + "]")); } public void testGetOperationsWhenShardNotStarted() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 2ed397d13138a..17e74847e2542 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -14,10 +14,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -28,7 +25,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -47,8 +43,6 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; From c2952b6bcf68fa42cdce3849dc16853dbf18726d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 11 Sep 2018 13:53:51 +0200 Subject: [PATCH 13/16] ignore replica shard stats --- .../org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index d6f490aa2fbb0..e9674f0c11a9e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -209,6 +209,12 @@ public void fetchLeaderHistoryUUIDs( String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()]; for (IndexShardStats indexShardStats : indexStats) { for (ShardStats shardStats : indexShardStats) { + // Ignore replica shards as they may not have yet started and + // we just end up overwriting slots in historyUUIDs + if (shardStats.getShardRouting().primary() == false) { + continue; + } + CommitStats commitStats = shardStats.getCommitStats(); if (commitStats == null) { onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing")); @@ -220,6 +226,7 @@ public void fetchLeaderHistoryUUIDs( return; } ShardId shardId = shardStats.getShardRouting().shardId(); + assert historyUUIDs[shardId.id()] == null; historyUUIDs[shardId.id()] = historyUUID; } } From 620505eb65ad0d7110ee0611f01bff6aaffafb36 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 11 Sep 2018 17:40:43 +0200 Subject: [PATCH 14/16] iter --- .../elasticsearch/xpack/ccr/CcrLicenseChecker.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index e9674f0c11a9e..2161d0a14237a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -27,9 +27,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.core.XPackPlugin; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Locale; import java.util.Objects; import java.util.function.BiConsumer; @@ -221,16 +219,16 @@ public void fetchLeaderHistoryUUIDs( return; } String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); - if (historyUUID == null) { - onFailure.accept(new IllegalArgumentException("leader index does not have an history uuid")); - return; - } ShardId shardId = shardStats.getShardRouting().shardId(); - assert historyUUIDs[shardId.id()] == null; historyUUIDs[shardId.id()] = historyUUID; } } - assert new HashSet<>(Arrays.asList(historyUUIDs)).size() == leaderIndexMetaData.getNumberOfShards(); + for (int i = 0; i < historyUUIDs.length; i++) { + if (historyUUIDs[i] == null) { + onFailure.accept(new IllegalArgumentException("no history uuid for [" + leaderIndex + "][" + i + "]")); + return; + } + } historyUUIDConsumer.accept(historyUUIDs); }; IndicesStatsRequest request = new IndicesStatsRequest(); From 89829fa419e2fbd588383a0d2693db6750573fb8 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 12:57:04 +0200 Subject: [PATCH 15/16] reduce timeout --- .../xpack/ccr/action/ShardFollowTaskReplicationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 17e74847e2542..9b04390a3a740 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -206,7 +206,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, - TimeValue.timeValueMillis(100), + TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), leaderGroup.getPrimary().getHistoryUUID(), Collections.emptyMap() From 7fa90700640aa5d877c79a350b86015e48e4bc2b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 13:22:06 +0200 Subject: [PATCH 16/16] fixed checkstyle violation --- .../xpack/ccr/action/ShardFollowTask.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index df479cc72bdb7..62894b0ed99e6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -258,8 +258,20 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, - maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay,recordedLeaderIndexHistoryUUID, headers); + return Objects.hash( + leaderClusterAlias, + followShardId, + leaderShardId, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxConcurrentWriteBatches, + maxBatchSizeInBytes, + maxWriteBufferSize, + maxRetryDelay, + idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + headers + ); } public String toString() {