Skip to content

Commit 5fa8131

Browse files
martijnvgdnhatn
andauthored
[CCR] Added history uuid validation (#33546)
For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 Co-authored-by: Nhat Nguyen <[email protected]>
1 parent c023f67 commit 5fa8131

18 files changed

+442
-122
lines changed

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

+4
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,10 @@ public IndexShard getPrimary() {
443443
return primary;
444444
}
445445

446+
public synchronized void reinitPrimaryShard() throws IOException {
447+
primary = reinitShard(primary);
448+
}
449+
446450
public void syncGlobalCheckpoint() {
447451
PlainActionFuture<ReplicationResponse> listener = new PlainActionFuture<>();
448452
try {

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testFollowIndex() throws Exception {
113113

114114
e = expectThrows(ResponseException.class,
115115
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
116-
assertThat(e.getMessage(), containsString("follow index [" + unallowedIndex + "] does not exist"));
116+
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
117117
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
118118
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
119119
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@
8585
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {
8686

8787
public static final String CCR_THREAD_POOL_NAME = "ccr";
88+
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
89+
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
8890

8991
private final boolean enabled;
9092
private final Settings settings;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

+81-12
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,18 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1212
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
14+
import org.elasticsearch.action.admin.indices.stats.IndexStats;
15+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
16+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
17+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1318
import org.elasticsearch.client.Client;
1419
import org.elasticsearch.cluster.ClusterState;
1520
import org.elasticsearch.cluster.metadata.IndexMetaData;
21+
import org.elasticsearch.common.CheckedConsumer;
22+
import org.elasticsearch.index.engine.CommitStats;
23+
import org.elasticsearch.index.engine.Engine;
24+
import org.elasticsearch.index.shard.ShardId;
1625
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1726
import org.elasticsearch.license.XPackLicenseState;
1827
import org.elasticsearch.rest.RestStatus;
@@ -21,6 +30,7 @@
2130
import java.util.Collections;
2231
import java.util.Locale;
2332
import java.util.Objects;
33+
import java.util.function.BiConsumer;
2434
import java.util.function.BooleanSupplier;
2535
import java.util.function.Consumer;
2636
import java.util.function.Function;
@@ -58,23 +68,24 @@ public boolean isCcrAllowed() {
5868
}
5969

6070
/**
61-
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
62-
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
63-
* Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
71+
* Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster.
72+
* Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR.
73+
* If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise,
74+
* the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
6475
*
65-
* @param client the client
66-
* @param clusterAlias the remote cluster alias
67-
* @param leaderIndex the name of the leader index
68-
* @param onFailure the failure consumer
69-
* @param leaderIndexMetadataConsumer the leader index metadata consumer
70-
* @param <T> the type of response the listener is waiting for
76+
* @param client the client
77+
* @param clusterAlias the remote cluster alias
78+
* @param leaderIndex the name of the leader index
79+
* @param onFailure the failure consumer
80+
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
81+
* @param <T> the type of response the listener is waiting for
7182
*/
72-
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
83+
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
7384
final Client client,
7485
final String clusterAlias,
7586
final String leaderIndex,
7687
final Consumer<Exception> onFailure,
77-
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
88+
final BiConsumer<String[], IndexMetaData> consumer) {
7889

7990
final ClusterStateRequest request = new ClusterStateRequest();
8091
request.clear();
@@ -85,7 +96,13 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
8596
clusterAlias,
8697
request,
8798
onFailure,
88-
leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)),
99+
leaderClusterState -> {
100+
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
101+
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
102+
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> {
103+
consumer.accept(historyUUIDs, leaderIndexMetaData);
104+
});
105+
},
89106
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
90107
e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e));
91108
}
@@ -168,6 +185,58 @@ public void onFailure(final Exception e) {
168185
});
169186
}
170187

188+
/**
189+
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
190+
*
191+
* @param leaderClient the leader client
192+
* @param leaderIndexMetaData the leader index metadata
193+
* @param onFailure the failure consumer
194+
* @param historyUUIDConsumer the leader index history uuid and consumer
195+
*/
196+
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
197+
// in case of following a local or a remote cluster.
198+
public void fetchLeaderHistoryUUIDs(
199+
final Client leaderClient,
200+
final IndexMetaData leaderIndexMetaData,
201+
final Consumer<Exception> onFailure,
202+
final Consumer<String[]> historyUUIDConsumer) {
203+
204+
String leaderIndex = leaderIndexMetaData.getIndex().getName();
205+
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> {
206+
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
207+
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()];
208+
for (IndexShardStats indexShardStats : indexStats) {
209+
for (ShardStats shardStats : indexShardStats) {
210+
// Ignore replica shards as they may not have yet started and
211+
// we just end up overwriting slots in historyUUIDs
212+
if (shardStats.getShardRouting().primary() == false) {
213+
continue;
214+
}
215+
216+
CommitStats commitStats = shardStats.getCommitStats();
217+
if (commitStats == null) {
218+
onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing"));
219+
return;
220+
}
221+
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
222+
ShardId shardId = shardStats.getShardRouting().shardId();
223+
historyUUIDs[shardId.id()] = historyUUID;
224+
}
225+
}
226+
for (int i = 0; i < historyUUIDs.length; i++) {
227+
if (historyUUIDs[i] == null) {
228+
onFailure.accept(new IllegalArgumentException("no history uuid for [" + leaderIndex + "][" + i + "]"));
229+
return;
230+
}
231+
}
232+
historyUUIDConsumer.accept(historyUUIDs);
233+
};
234+
IndicesStatsRequest request = new IndicesStatsRequest();
235+
request.clear();
236+
request.indices(leaderIndex);
237+
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
238+
}
239+
171240
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
172241
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
173242
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

+29-4
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@ public static class Request extends SingleShardRequest<Request> {
5858
private long fromSeqNo;
5959
private int maxOperationCount;
6060
private ShardId shardId;
61+
private String expectedHistoryUUID;
6162
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
6263

63-
public Request(ShardId shardId) {
64+
public Request(ShardId shardId, String expectedHistoryUUID) {
6465
super(shardId.getIndexName());
6566
this.shardId = shardId;
67+
this.expectedHistoryUUID = expectedHistoryUUID;
6668
}
6769

6870
Request() {
@@ -96,6 +98,10 @@ public void setMaxOperationSizeInBytes(long maxOperationSizeInBytes) {
9698
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
9799
}
98100

101+
public String getExpectedHistoryUUID() {
102+
return expectedHistoryUUID;
103+
}
104+
99105
@Override
100106
public ActionRequestValidationException validate() {
101107
ActionRequestValidationException validationException = null;
@@ -119,6 +125,7 @@ public void readFrom(StreamInput in) throws IOException {
119125
fromSeqNo = in.readVLong();
120126
maxOperationCount = in.readVInt();
121127
shardId = ShardId.readShardId(in);
128+
expectedHistoryUUID = in.readString();
122129
maxOperationSizeInBytes = in.readVLong();
123130
}
124131

@@ -128,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
128135
out.writeVLong(fromSeqNo);
129136
out.writeVInt(maxOperationCount);
130137
shardId.writeTo(out);
138+
out.writeString(expectedHistoryUUID);
131139
out.writeVLong(maxOperationSizeInBytes);
132140
}
133141

@@ -140,12 +148,13 @@ public boolean equals(final Object o) {
140148
return fromSeqNo == request.fromSeqNo &&
141149
maxOperationCount == request.maxOperationCount &&
142150
Objects.equals(shardId, request.shardId) &&
151+
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
143152
maxOperationSizeInBytes == request.maxOperationSizeInBytes;
144153
}
145154

146155
@Override
147156
public int hashCode() {
148-
return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes);
157+
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
149158
}
150159

151160
@Override
@@ -154,6 +163,7 @@ public String toString() {
154163
"fromSeqNo=" + fromSeqNo +
155164
", maxOperationCount=" + maxOperationCount +
156165
", shardId=" + shardId +
166+
", expectedHistoryUUID=" + expectedHistoryUUID +
157167
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
158168
'}';
159169
}
@@ -189,7 +199,12 @@ public Translog.Operation[] getOperations() {
189199
Response() {
190200
}
191201

192-
Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
202+
Response(
203+
final long mappingVersion,
204+
final long globalCheckpoint,
205+
final long maxSeqNo,
206+
final Translog.Operation[] operations) {
207+
193208
this.mappingVersion = mappingVersion;
194209
this.globalCheckpoint = globalCheckpoint;
195210
this.maxSeqNo = maxSeqNo;
@@ -260,6 +275,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
260275
seqNoStats.getGlobalCheckpoint(),
261276
request.fromSeqNo,
262277
request.maxOperationCount,
278+
request.expectedHistoryUUID,
263279
request.maxOperationSizeInBytes);
264280
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
265281
}
@@ -293,11 +309,20 @@ protected Response newResponse() {
293309
* Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method
294310
* stops collecting more operations and returns what has been collected so far.
295311
*/
296-
static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount,
312+
static Translog.Operation[] getOperations(IndexShard indexShard,
313+
long globalCheckpoint,
314+
long fromSeqNo,
315+
int maxOperationCount,
316+
String expectedHistoryUUID,
297317
long maxOperationSizeInBytes) throws IOException {
298318
if (indexShard.state() != IndexShardState.STARTED) {
299319
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
300320
}
321+
final String historyUUID = indexShard.getHistoryUUID();
322+
if (historyUUID.equals(expectedHistoryUUID) == false) {
323+
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
324+
historyUUID + "]");
325+
}
301326
if (fromSeqNo > globalCheckpoint) {
302327
return EMPTY_OPERATIONS_ARRAY;
303328
}

0 commit comments

Comments
 (0)