Skip to content

Commit eab530c

Browse files
committed
Ensure flush happens on shard idle
This adds 2 testcases that test if a shard goes idle pending (uncommitted) segments are committed and unreferenced files will be freed. Relates to #29482
1 parent 782517b commit eab530c

File tree

5 files changed

+163
-12
lines changed

5 files changed

+163
-12
lines changed

server/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
2828
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
2929
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
30+
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
31+
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
32+
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
3033
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
3134
import org.elasticsearch.action.admin.indices.stats.CommonStats;
3235
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@@ -64,6 +67,7 @@
6467
import java.util.Arrays;
6568
import java.util.Collection;
6669
import java.util.List;
70+
import java.util.Map;
6771
import java.util.stream.IntStream;
6872

6973
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -467,4 +471,77 @@ public void testCreateShrinkWithIndexSort() throws Exception {
467471
flushAndRefresh();
468472
assertSortedSegments("target", expectedIndexSort);
469473
}
474+
475+
476+
public void testShrinkCommitsMergeOnIdle() throws Exception {
477+
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
478+
.put("index.number_of_replicas", 0)
479+
.put("number_of_shards", 5)).get();
480+
for (int i = 0; i < 30; i++) {
481+
client().prepareIndex("source", "type")
482+
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
483+
}
484+
client().admin().indices().prepareFlush("source").get();
485+
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
486+
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
487+
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
488+
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
489+
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
490+
// to the require._name below.
491+
ensureGreen();
492+
// relocate all shards to one node such that we can merge it.
493+
client().admin().indices().prepareUpdateSettings("source")
494+
.setSettings(Settings.builder()
495+
.put("index.routing.allocation.require._name", discoveryNodes[0].getName())
496+
.put("index.blocks.write", true)).get();
497+
ensureGreen();
498+
IndicesSegmentResponse sourceStats = client().admin().indices().prepareSegments("source").get();
499+
500+
// disable rebalancing to be able to capture the right stats. balancing can move the target primary
501+
// making it hard to pin point the source shards.
502+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
503+
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"
504+
)).get();
505+
506+
// now merge source into a single shard index
507+
assertAcked(client().admin().indices().prepareResizeIndex("source", "target")
508+
.setSettings(Settings.builder().put("index.number_of_replicas", 0).build()).get());
509+
ensureGreen();
510+
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
511+
IndexMetaData target = clusterStateResponse.getState().getMetaData().index("target");
512+
client().admin().indices().prepareForceMerge("target").setMaxNumSegments(1).setFlush(false).get();
513+
IndicesSegmentResponse targetSegStats = client().admin().indices().prepareSegments("target").get();
514+
ShardSegments segmentsStats = targetSegStats.getIndices().get("target").getShards().get(0).getShards()[0];
515+
assertTrue(segmentsStats.getNumberOfCommitted() > 0);
516+
assertNotEquals(segmentsStats.getSegments(), segmentsStats.getNumberOfCommitted());
517+
518+
Iterable<IndicesService> dataNodeInstances = internalCluster().getDataNodeInstances(IndicesService.class);
519+
for (IndicesService service : dataNodeInstances) {
520+
if (service.hasIndex(target.getIndex())) {
521+
IndexService indexShards = service.indexService(target.getIndex());
522+
IndexShard shard = indexShards.getShard(0);
523+
assertTrue(shard.isActive());
524+
shard.checkIdle(0);
525+
assertFalse(shard.isActive());
526+
}
527+
}
528+
assertBusy(() -> {
529+
IndicesSegmentResponse targetStats = client().admin().indices().prepareSegments("target").get();
530+
ShardSegments targetShardSegments = targetStats.getIndices().get("target").getShards().get(0).getShards()[0];
531+
Map<Integer, IndexShardSegments> source = sourceStats.getIndices().get("source").getShards();
532+
int numSourceSegments = 0;
533+
for (IndexShardSegments s : source.values()) {
534+
numSourceSegments += s.getAt(0).getNumberOfCommitted();
535+
}
536+
assertTrue(targetShardSegments.getSegments().size() < numSourceSegments);
537+
assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getNumberOfSearch());
538+
assertEquals(targetShardSegments.getNumberOfCommitted(), targetShardSegments.getSegments().size());
539+
assertEquals(1, targetShardSegments.getSegments().size());
540+
});
541+
542+
// clean up
543+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(
544+
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String)null
545+
)).get();
546+
}
470547
}

server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
264264
RecoverySource.PeerRecoverySource.INSTANCE);
265265

266266
final IndexShard newReplica =
267-
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {});
267+
newShard(shardRouting, shardPath, indexMetaData, null, getEngineFactory(shardRouting), () -> {}, EMPTY_EVENT_LISTENER);
268268
replicas.add(newReplica);
269269
updateAllocationIDsOnPrimary();
270270
return newReplica;

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,12 @@
7272
import org.elasticsearch.core.internal.io.IOUtils;
7373
import org.elasticsearch.env.NodeEnvironment;
7474
import org.elasticsearch.index.IndexSettings;
75+
import org.elasticsearch.index.MergePolicyConfig;
7576
import org.elasticsearch.index.VersionType;
7677
import org.elasticsearch.index.engine.Engine;
7778
import org.elasticsearch.index.engine.EngineException;
7879
import org.elasticsearch.index.engine.InternalEngine;
80+
import org.elasticsearch.index.engine.Segment;
7981
import org.elasticsearch.index.engine.SegmentsStats;
8082
import org.elasticsearch.index.fielddata.FieldDataStats;
8183
import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -1867,7 +1869,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
18671869
closeShards(shard);
18681870
IndexShard newShard = newShard(
18691871
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
1870-
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
1872+
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
18711873

18721874
recoverShardFromStore(newShard);
18731875

@@ -2013,7 +2015,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
20132015
closeShards(shard);
20142016
IndexShard newShard = newShard(
20152017
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
2016-
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {});
2018+
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
20172019

20182020
recoverShardFromStore(newShard);
20192021

@@ -2496,7 +2498,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
24962498
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
24972499
.build();
24982500
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
2499-
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer());
2501+
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
25002502

25012503
Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
25022504
assertTrue("at least 2 files, commit and data: " + storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
@@ -2980,4 +2982,74 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
29802982
breaker = primary.circuitBreakerService.getBreaker(CircuitBreaker.ACCOUNTING);
29812983
assertThat(breaker.getUsed(), equalTo(0L));
29822984
}
2985+
2986+
public void testFlushOnInactive() throws Exception {
2987+
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
2988+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
2989+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
2990+
.build();
2991+
IndexMetaData metaData = IndexMetaData.builder("test")
2992+
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
2993+
.settings(settings)
2994+
.primaryTerm(0, 1).build();
2995+
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId(metaData.getIndex(), 0), "n1", true, ShardRoutingState
2996+
.INITIALIZING, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE);
2997+
final ShardId shardId = shardRouting.shardId();
2998+
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
2999+
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
3000+
AtomicBoolean markedInactive = new AtomicBoolean();
3001+
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
3002+
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, () -> {
3003+
}, new IndexEventListener() {
3004+
@Override
3005+
public void onShardInactive(IndexShard indexShard) {
3006+
markedInactive.set(true);
3007+
primaryRef.get().flush(new FlushRequest());
3008+
}
3009+
});
3010+
primaryRef.set(primary);
3011+
recoverShardFromStore(primary);
3012+
for (int i = 0; i < 3; i++) {
3013+
indexDoc(primary, "test", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
3014+
primary.refresh("test"); // produce segments
3015+
}
3016+
List<Segment> segments = primary.segments(false);
3017+
Set<String> names = new HashSet<>();
3018+
for (Segment segment : segments) {
3019+
assertFalse(segment.committed);
3020+
assertTrue(segment.search);
3021+
names.add(segment.getName());
3022+
}
3023+
assertEquals(3, segments.size());
3024+
primary.flush(new FlushRequest());
3025+
primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
3026+
primary.refresh("test");
3027+
segments = primary.segments(false);
3028+
for (Segment segment : segments) {
3029+
if (names.contains(segment.getName())) {
3030+
assertTrue(segment.committed);
3031+
assertFalse(segment.search);
3032+
} else {
3033+
assertFalse(segment.committed);
3034+
assertTrue(segment.search);
3035+
}
3036+
}
3037+
assertEquals(4, segments.size());
3038+
3039+
assertFalse(markedInactive.get());
3040+
assertBusy(() -> {
3041+
primary.checkIdle(0);
3042+
assertFalse(primary.isActive());
3043+
});
3044+
3045+
assertTrue(markedInactive.get());
3046+
segments = primary.segments(false);
3047+
assertEquals(1, segments.size());
3048+
for (Segment segment : segments) {
3049+
assertTrue(segment.committed);
3050+
assertTrue(segment.search);
3051+
}
3052+
closeShards(primary);
3053+
}
3054+
29833055
}

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
9797

9898
// build a new shard using the same store directory as the closed shard
9999
ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
100-
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {});
100+
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {},
101+
EMPTY_EVENT_LISTENER);
101102

102103
// restore the shard
103104
recoverShardFromSnapshot(shard, snapshot, repository);

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@
105105
*/
106106
public abstract class IndexShardTestCase extends ESTestCase {
107107

108+
public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {};
109+
108110
protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
109111
@Override
110112
public void onRecoveryDone(RecoveryState state) {
@@ -260,24 +262,25 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
260262
final ShardId shardId = routing.shardId();
261263
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
262264
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
263-
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer, listeners);
265+
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory, globalCheckpointSyncer,
266+
EMPTY_EVENT_LISTENER, listeners);
264267
}
265268

266269
/**
267270
* creates a new initializing shard.
268-
*
269-
* @param routing shard routing to use
271+
* @param routing shard routing to use
270272
* @param shardPath path to use for shard data
271273
* @param indexMetaData indexMetaData for the shard, including any mapping
272274
* @param indexSearcherWrapper an optional wrapper to be used during searchers
273275
* @param globalCheckpointSyncer callback for syncing global checkpoints
276+
* @param indexEventListener
274277
* @param listeners an optional set of listeners to add to the shard
275278
*/
276279
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
277280
@Nullable IndexSearcherWrapper indexSearcherWrapper,
278281
@Nullable EngineFactory engineFactory,
279282
Runnable globalCheckpointSyncer,
280-
IndexingOperationListener... listeners) throws IOException {
283+
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
281284
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
282285
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
283286
final IndexShard indexShard;
@@ -289,8 +292,6 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe
289292
indexSettings.getSettings(), "index");
290293
mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY);
291294
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
292-
final IndexEventListener indexEventListener = new IndexEventListener() {
293-
};
294295
final Engine.Warmer warmer = searcher -> {
295296
};
296297
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
@@ -335,7 +336,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index
335336
null,
336337
current.engineFactory,
337338
current.getGlobalCheckpointSyncer(),
338-
listeners);
339+
EMPTY_EVENT_LISTENER, listeners);
339340
}
340341

341342
/**

0 commit comments

Comments
 (0)