|
27 | 27 | import org.elasticsearch.rest.RestStatus;
|
28 | 28 | import org.elasticsearch.xpack.core.XPackPlugin;
|
29 | 29 |
|
| 30 | +import java.util.Arrays; |
30 | 31 | import java.util.Collections;
|
| 32 | +import java.util.HashSet; |
31 | 33 | import java.util.Locale;
|
32 | 34 | import java.util.Objects;
|
33 | 35 | import java.util.function.BiConsumer;
|
@@ -73,12 +75,12 @@ public boolean isCcrAllowed() {
|
73 | 75 | * If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise,
|
74 | 76 | * the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
|
75 | 77 | *
|
76 |
| - * @param client the client |
77 |
| - * @param clusterAlias the remote cluster alias |
78 |
| - * @param leaderIndex the name of the leader index |
79 |
| - * @param onFailure the failure consumer |
80 |
| - * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards |
81 |
| - * @param <T> the type of response the listener is waiting for |
| 78 | + * @param client the client |
| 79 | + * @param clusterAlias the remote cluster alias |
| 80 | + * @param leaderIndex the name of the leader index |
| 81 | + * @param onFailure the failure consumer |
| 82 | + * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards |
| 83 | + * @param <T> the type of response the listener is waiting for |
82 | 84 | */
|
83 | 85 | public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
|
84 | 86 | final Client client,
|
@@ -208,14 +210,24 @@ public void fetchLeaderHistoryUUIDs(
|
208 | 210 | for (IndexShardStats indexShardStats : indexStats) {
|
209 | 211 | for (ShardStats shardStats : indexShardStats) {
|
210 | 212 | CommitStats commitStats = shardStats.getCommitStats();
|
| 213 | + if (commitStats == null) { |
| 214 | + onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing")); |
| 215 | + return; |
| 216 | + } |
211 | 217 | String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
|
| 218 | + if (historyUUID == null) { |
| 219 | + onFailure.accept(new IllegalArgumentException("leader index does not have an history uuid")); |
| 220 | + return; |
| 221 | + } |
212 | 222 | ShardId shardId = shardStats.getShardRouting().shardId();
|
213 | 223 | historyUUIDs[shardId.id()] = historyUUID;
|
214 | 224 | }
|
215 | 225 | }
|
| 226 | + assert new HashSet<>(Arrays.asList(historyUUIDs)).size() == leaderIndexMetaData.getNumberOfShards(); |
216 | 227 | historyUUIDConsumer.accept(historyUUIDs);
|
217 | 228 | };
|
218 | 229 | IndicesStatsRequest request = new IndicesStatsRequest();
|
| 230 | + request.clear(); |
219 | 231 | request.indices(leaderIndex);
|
220 | 232 | leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
|
221 | 233 | }
|
|
0 commit comments