Skip to content

Commit d4831c2

Browse files
committed
Close Index API should force a flush if a sync is needed (#37961)
This commit changes the TransportVerifyShardBeforeCloseAction so that it issues a forced flush, forcing the translog and the Lucene commit to contain the same max seq number and global checkpoint in the case the Translog contains operations that were not written in the IndexWriter (like a Delete that touches a non existing doc). This way the assertion added in #37426 won't trip. Related to #33888
1 parent bfffb34 commit d4831c2

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.elasticsearch.action.admin.indices.close;
2020

21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
2123
import org.elasticsearch.action.ActionListener;
2224
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2325
import org.elasticsearch.action.support.ActionFilters;
@@ -50,6 +52,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
5052
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
5153

5254
public static final String NAME = CloseIndexAction.NAME + "[s]";
55+
protected Logger logger = LogManager.getLogger(getClass());
5356

5457
@Inject
5558
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
@@ -111,8 +114,10 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
111114
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
112115
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
113116
}
114-
indexShard.flush(new FlushRequest());
115-
logger.debug("{} shard is ready for closing", shardId);
117+
118+
final boolean forced = indexShard.isSyncNeeded();
119+
indexShard.flush(new FlushRequest().force(forced));
120+
logger.trace("{} shard is ready for closing [forced:{}]", shardId, forced);
116121
}
117122

118123
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1010
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
11+
import org.elasticsearch.action.delete.DeleteResponse;
1112
import org.elasticsearch.action.search.SearchResponse;
1213
import org.elasticsearch.action.search.SearchType;
1314
import org.elasticsearch.action.support.IndicesOptions;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.index.shard.IndexShardTestCase;
2728
import org.elasticsearch.indices.IndicesService;
2829
import org.elasticsearch.plugins.Plugin;
30+
import org.elasticsearch.rest.RestStatus;
2931
import org.elasticsearch.search.SearchService;
3032
import org.elasticsearch.search.builder.SearchSourceBuilder;
3133
import org.elasticsearch.search.internal.AliasFilter;
@@ -46,6 +48,8 @@
4648
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4749
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
4850
import static org.hamcrest.Matchers.equalTo;
51+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
52+
import static org.hamcrest.Matchers.is;
4953

5054
public class FrozenIndexTests extends ESSingleNodeTestCase {
5155

@@ -340,4 +344,30 @@ public void testFreezeIndexIncreasesIndexSettingsVersion() throws ExecutionExcep
340344
assertThat(client().admin().cluster().prepareState().get().getState().metaData().index(index).getSettingsVersion(),
341345
equalTo(settingsVersion + 1));
342346
}
347+
348+
public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
349+
final String indexName = "empty";
350+
createIndex(indexName, Settings.builder()
351+
.put("index.number_of_shards", 1)
352+
.put("index.number_of_replicas", 0)
353+
.put("index.refresh_interval", TimeValue.MINUS_ONE)
354+
.build());
355+
356+
final long nbNoOps = randomIntBetween(1, 10);
357+
for (long i = 0; i < nbNoOps; i++) {
358+
final DeleteResponse deleteResponse = client().prepareDelete(indexName, "_doc", Long.toString(i)).get();
359+
assertThat(deleteResponse.status(), is(RestStatus.NOT_FOUND));
360+
}
361+
362+
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
363+
assertBusy(() -> {
364+
final Index index = client().admin().cluster().prepareState().get().getState().metaData().index(indexName).getIndex();
365+
final IndexService indexService = indicesService.indexService(index);
366+
assertThat(indexService.hasShard(0), is(true));
367+
assertThat(indexService.getShard(0).getGlobalCheckpoint(), greaterThanOrEqualTo(nbNoOps - 1L));
368+
});
369+
370+
assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
371+
assertIndexFrozen(indexName);
372+
}
343373
}

0 commit comments

Comments
 (0)