Skip to content

Commit ebd46be

Browse files
martijnvgdnhatn
andcommitted
[CCR] Added history uuid validation (#33546)
For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 Co-authored-by: Nhat Nguyen <[email protected]>
1 parent f20c976 commit ebd46be

18 files changed

+442
-122
lines changed

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,10 @@ public IndexShard getPrimary() {
443443
return primary;
444444
}
445445

446+
public synchronized void reinitPrimaryShard() throws IOException {
447+
primary = reinitShard(primary);
448+
}
449+
446450
public void syncGlobalCheckpoint() {
447451
PlainActionFuture<ReplicationResponse> listener = new PlainActionFuture<>();
448452
try {

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testFollowIndex() throws Exception {
113113

114114
e = expectThrows(ResponseException.class,
115115
() -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex));
116-
assertThat(e.getMessage(), containsString("follow index [" + unallowedIndex + "] does not exist"));
116+
assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]"));
117117
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
118118
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
119119
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@
8686
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {
8787

8888
public static final String CCR_THREAD_POOL_NAME = "ccr";
89+
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
90+
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
8991

9092
private final boolean enabled;
9193
private final Settings settings;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,18 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1212
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
14+
import org.elasticsearch.action.admin.indices.stats.IndexStats;
15+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
16+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
17+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1318
import org.elasticsearch.client.Client;
1419
import org.elasticsearch.cluster.ClusterState;
1520
import org.elasticsearch.cluster.metadata.IndexMetaData;
21+
import org.elasticsearch.common.CheckedConsumer;
22+
import org.elasticsearch.index.engine.CommitStats;
23+
import org.elasticsearch.index.engine.Engine;
24+
import org.elasticsearch.index.shard.ShardId;
1625
import org.elasticsearch.license.RemoteClusterLicenseChecker;
1726
import org.elasticsearch.license.XPackLicenseState;
1827
import org.elasticsearch.rest.RestStatus;
@@ -21,6 +30,7 @@
2130
import java.util.Collections;
2231
import java.util.Locale;
2332
import java.util.Objects;
33+
import java.util.function.BiConsumer;
2434
import java.util.function.BooleanSupplier;
2535
import java.util.function.Consumer;
2636
import java.util.function.Function;
@@ -58,23 +68,24 @@ public boolean isCcrAllowed() {
5868
}
5969

6070
/**
61-
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
62-
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
63-
* Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
71+
* Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster.
72+
* Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR.
73+
* If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise,
74+
* the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
6475
*
65-
* @param client the client
66-
* @param clusterAlias the remote cluster alias
67-
* @param leaderIndex the name of the leader index
68-
* @param onFailure the failure consumer
69-
* @param leaderIndexMetadataConsumer the leader index metadata consumer
70-
* @param <T> the type of response the listener is waiting for
76+
* @param client the client
77+
* @param clusterAlias the remote cluster alias
78+
* @param leaderIndex the name of the leader index
79+
* @param onFailure the failure consumer
80+
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
81+
* @param <T> the type of response the listener is waiting for
7182
*/
72-
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
83+
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
7384
final Client client,
7485
final String clusterAlias,
7586
final String leaderIndex,
7687
final Consumer<Exception> onFailure,
77-
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
88+
final BiConsumer<String[], IndexMetaData> consumer) {
7889

7990
final ClusterStateRequest request = new ClusterStateRequest();
8091
request.clear();
@@ -85,7 +96,13 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
8596
clusterAlias,
8697
request,
8798
onFailure,
88-
leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)),
99+
leaderClusterState -> {
100+
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex);
101+
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
102+
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> {
103+
consumer.accept(historyUUIDs, leaderIndexMetaData);
104+
});
105+
},
89106
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
90107
e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e));
91108
}
@@ -168,6 +185,58 @@ public void onFailure(final Exception e) {
168185
});
169186
}
170187

188+
/**
189+
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
190+
*
191+
* @param leaderClient the leader client
192+
* @param leaderIndexMetaData the leader index metadata
193+
* @param onFailure the failure consumer
194+
* @param historyUUIDConsumer the leader index history uuid and consumer
195+
*/
196+
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
197+
// in case of following a local or a remote cluster.
198+
public void fetchLeaderHistoryUUIDs(
199+
final Client leaderClient,
200+
final IndexMetaData leaderIndexMetaData,
201+
final Consumer<Exception> onFailure,
202+
final Consumer<String[]> historyUUIDConsumer) {
203+
204+
String leaderIndex = leaderIndexMetaData.getIndex().getName();
205+
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> {
206+
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
207+
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()];
208+
for (IndexShardStats indexShardStats : indexStats) {
209+
for (ShardStats shardStats : indexShardStats) {
210+
// Ignore replica shards as they may not have yet started and
211+
// we just end up overwriting slots in historyUUIDs
212+
if (shardStats.getShardRouting().primary() == false) {
213+
continue;
214+
}
215+
216+
CommitStats commitStats = shardStats.getCommitStats();
217+
if (commitStats == null) {
218+
onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing"));
219+
return;
220+
}
221+
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
222+
ShardId shardId = shardStats.getShardRouting().shardId();
223+
historyUUIDs[shardId.id()] = historyUUID;
224+
}
225+
}
226+
for (int i = 0; i < historyUUIDs.length; i++) {
227+
if (historyUUIDs[i] == null) {
228+
onFailure.accept(new IllegalArgumentException("no history uuid for [" + leaderIndex + "][" + i + "]"));
229+
return;
230+
}
231+
}
232+
historyUUIDConsumer.accept(historyUUIDs);
233+
};
234+
IndicesStatsRequest request = new IndicesStatsRequest();
235+
request.clear();
236+
request.indices(leaderIndex);
237+
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
238+
}
239+
171240
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
172241
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
173242
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ public static class Request extends SingleShardRequest<Request> {
6565
private long fromSeqNo;
6666
private int maxOperationCount;
6767
private ShardId shardId;
68+
private String expectedHistoryUUID;
6869
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
6970

70-
public Request(ShardId shardId) {
71+
public Request(ShardId shardId, String expectedHistoryUUID) {
7172
super(shardId.getIndexName());
7273
this.shardId = shardId;
74+
this.expectedHistoryUUID = expectedHistoryUUID;
7375
}
7476

7577
Request() {
@@ -103,6 +105,10 @@ public void setMaxOperationSizeInBytes(long maxOperationSizeInBytes) {
103105
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
104106
}
105107

108+
public String getExpectedHistoryUUID() {
109+
return expectedHistoryUUID;
110+
}
111+
106112
@Override
107113
public ActionRequestValidationException validate() {
108114
ActionRequestValidationException validationException = null;
@@ -126,6 +132,7 @@ public void readFrom(StreamInput in) throws IOException {
126132
fromSeqNo = in.readVLong();
127133
maxOperationCount = in.readVInt();
128134
shardId = ShardId.readShardId(in);
135+
expectedHistoryUUID = in.readString();
129136
maxOperationSizeInBytes = in.readVLong();
130137
}
131138

@@ -135,6 +142,7 @@ public void writeTo(StreamOutput out) throws IOException {
135142
out.writeVLong(fromSeqNo);
136143
out.writeVInt(maxOperationCount);
137144
shardId.writeTo(out);
145+
out.writeString(expectedHistoryUUID);
138146
out.writeVLong(maxOperationSizeInBytes);
139147
}
140148

@@ -147,12 +155,13 @@ public boolean equals(final Object o) {
147155
return fromSeqNo == request.fromSeqNo &&
148156
maxOperationCount == request.maxOperationCount &&
149157
Objects.equals(shardId, request.shardId) &&
158+
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
150159
maxOperationSizeInBytes == request.maxOperationSizeInBytes;
151160
}
152161

153162
@Override
154163
public int hashCode() {
155-
return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes);
164+
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
156165
}
157166

158167
@Override
@@ -161,6 +170,7 @@ public String toString() {
161170
"fromSeqNo=" + fromSeqNo +
162171
", maxOperationCount=" + maxOperationCount +
163172
", shardId=" + shardId +
173+
", expectedHistoryUUID=" + expectedHistoryUUID +
164174
", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
165175
'}';
166176
}
@@ -196,7 +206,12 @@ public Translog.Operation[] getOperations() {
196206
Response() {
197207
}
198208

199-
Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) {
209+
Response(
210+
final long mappingVersion,
211+
final long globalCheckpoint,
212+
final long maxSeqNo,
213+
final Translog.Operation[] operations) {
214+
200215
this.mappingVersion = mappingVersion;
201216
this.globalCheckpoint = globalCheckpoint;
202217
this.maxSeqNo = maxSeqNo;
@@ -274,6 +289,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
274289
seqNoStats.getGlobalCheckpoint(),
275290
request.fromSeqNo,
276291
request.maxOperationCount,
292+
request.expectedHistoryUUID,
277293
request.maxOperationSizeInBytes);
278294
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
279295
}
@@ -307,11 +323,20 @@ protected Response newResponse() {
307323
* Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method
308324
* stops collecting more operations and returns what has been collected so far.
309325
*/
310-
static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount,
326+
static Translog.Operation[] getOperations(IndexShard indexShard,
327+
long globalCheckpoint,
328+
long fromSeqNo,
329+
int maxOperationCount,
330+
String expectedHistoryUUID,
311331
long maxOperationSizeInBytes) throws IOException {
312332
if (indexShard.state() != IndexShardState.STARTED) {
313333
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
314334
}
335+
final String historyUUID = indexShard.getHistoryUUID();
336+
if (historyUUID.equals(expectedHistoryUUID) == false) {
337+
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
338+
historyUUID + "]");
339+
}
315340
if (fromSeqNo > globalCheckpoint) {
316341
return EMPTY_OPERATIONS_ARRAY;
317342
}

0 commit comments

Comments
 (0)