|
20 | 20 | package org.elasticsearch.test;
|
21 | 21 |
|
22 | 22 | import com.carrotsearch.hppc.ObjectLongMap;
|
| 23 | +import com.carrotsearch.hppc.cursors.IntObjectCursor; |
| 24 | +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; |
23 | 25 | import com.carrotsearch.randomizedtesting.RandomizedContext;
|
24 | 26 | import com.carrotsearch.randomizedtesting.annotations.TestGroup;
|
25 | 27 | import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
26 | 28 | import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
27 | 29 | import org.apache.http.HttpHost;
|
28 | 30 | import org.apache.lucene.search.Sort;
|
| 31 | +import org.apache.lucene.store.AlreadyClosedException; |
29 | 32 | import org.apache.lucene.util.LuceneTestCase;
|
30 | 33 | import org.elasticsearch.ElasticsearchException;
|
31 | 34 | import org.elasticsearch.ExceptionsHelper;
|
|
48 | 51 | import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
49 | 52 | import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
50 | 53 | import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
51 |
| -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; |
52 |
| -import org.elasticsearch.action.admin.indices.stats.IndexStats; |
53 |
| -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; |
54 |
| -import org.elasticsearch.action.admin.indices.stats.ShardStats; |
55 | 54 | import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
|
56 | 55 | import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
57 | 56 | import org.elasticsearch.action.bulk.BulkResponse;
|
|
187 | 186 | import java.util.List;
|
188 | 187 | import java.util.Locale;
|
189 | 188 | import java.util.Map;
|
190 |
| -import java.util.Optional; |
191 | 189 | import java.util.Random;
|
192 | 190 | import java.util.Set;
|
193 | 191 | import java.util.concurrent.Callable;
|
@@ -2329,40 +2327,48 @@ public static Index resolveIndex(String index) {
|
2329 | 2327 |
|
2330 | 2328 | protected void assertSeqNos() throws Exception {
|
2331 | 2329 | assertBusy(() -> {
|
2332 |
| - IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); |
2333 |
| - for (IndexStats indexStats : stats.getIndices().values()) { |
2334 |
| - for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { |
2335 |
| - Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards()) |
2336 |
| - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) |
2337 |
| - .findFirst(); |
2338 |
| - if (maybePrimary.isPresent() == false) { |
| 2330 | + final ClusterState state = clusterService().state(); |
| 2331 | + for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) { |
| 2332 | + for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) { |
| 2333 | + ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard(); |
| 2334 | + if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) { |
2339 | 2335 | continue;
|
2340 | 2336 | }
|
2341 |
| - ShardStats primary = maybePrimary.get(); |
2342 |
| - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); |
2343 |
| - final ShardRouting primaryShardRouting = primary.getShardRouting(); |
| 2337 | + DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId()); |
| 2338 | + IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName()) |
| 2339 | + .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id()); |
| 2340 | + final SeqNoStats primarySeqNoStats; |
| 2341 | + final ObjectLongMap<String> syncGlobalCheckpoints; |
| 2342 | + try { |
| 2343 | + primarySeqNoStats = primaryShard.seqNoStats(); |
| 2344 | + syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints(); |
| 2345 | + } catch (AlreadyClosedException ex) { |
| 2346 | + continue; // shard is closed - just ignore |
| 2347 | + } |
2344 | 2348 | assertThat(primaryShardRouting + " should have set the global checkpoint",
|
2345 |
| - primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); |
2346 |
| - final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); |
2347 |
| - final IndicesService indicesService = |
2348 |
| - internalCluster().getInstance(IndicesService.class, node.getName()); |
2349 |
| - final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); |
2350 |
| - final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); |
2351 |
| - for (ShardStats shardStats : indexShardStats) { |
2352 |
| - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); |
2353 |
| - if (seqNoStats == null) { |
2354 |
| - continue; // this shard was closed |
| 2349 | + primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); |
| 2350 | + for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) { |
| 2351 | + if (replicaShardRouting.assignedToNode() == false) { |
| 2352 | + continue; |
| 2353 | + } |
| 2354 | + DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId()); |
| 2355 | + IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName()) |
| 2356 | + .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id()); |
| 2357 | + final SeqNoStats seqNoStats; |
| 2358 | + try { |
| 2359 | + seqNoStats = replicaShard.seqNoStats(); |
| 2360 | + } catch (AlreadyClosedException e) { |
| 2361 | + continue; // shard is closed - just ignore |
2355 | 2362 | }
|
2356 |
| - assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", |
2357 |
| - seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); |
2358 |
| - assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", |
2359 |
| - seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); |
2360 |
| - assertThat(shardStats.getShardRouting() + " max seq no mismatch", |
2361 |
| - seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); |
| 2363 | + assertThat(replicaShardRouting + " local checkpoint mismatch", |
| 2364 | + seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); |
| 2365 | + assertThat(replicaShardRouting + " global checkpoint mismatch", |
| 2366 | + seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); |
| 2367 | + assertThat(replicaShardRouting + " max seq no mismatch", |
| 2368 | + seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); |
2362 | 2369 | // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
|
2363 |
| - assertThat( |
2364 |
| - seqNoStats.getGlobalCheckpoint(), |
2365 |
| - equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); |
| 2370 | + assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(), |
| 2371 | + equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))); |
2366 | 2372 | }
|
2367 | 2373 | }
|
2368 | 2374 | }
|
|
0 commit comments