Skip to content

Commit 1ca5dd1

Browse files
authored
Flush instead of synced-flush inactive shards (#51365)
If all nodes are on 7.6, we prefer to perform a normal flush instead of synced flush when a shard becomes inactive. Backport of #49126
1 parent 072203c commit 1ca5dd1

File tree

4 files changed

+71
-9
lines changed

4 files changed

+71
-9
lines changed

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.common.io.stream.StreamInput;
4343
import org.elasticsearch.common.io.stream.StreamOutput;
4444
import org.elasticsearch.common.logging.DeprecationLogger;
45+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4546
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4647
import org.elasticsearch.common.util.concurrent.CountDown;
4748
import org.elasticsearch.index.Index;
@@ -51,6 +52,7 @@
5152
import org.elasticsearch.index.engine.Engine;
5253
import org.elasticsearch.index.shard.IndexEventListener;
5354
import org.elasticsearch.index.shard.IndexShard;
55+
import org.elasticsearch.index.shard.IndexShardState;
5456
import org.elasticsearch.index.shard.ShardId;
5557
import org.elasticsearch.index.shard.ShardNotFoundException;
5658
import org.elasticsearch.indices.IndexClosedException;
@@ -72,7 +74,6 @@
7274
import java.util.List;
7375
import java.util.Map;
7476
import java.util.concurrent.ConcurrentMap;
75-
import java.util.stream.StreamSupport;
7677

7778
public class SyncedFlushService implements IndexEventListener {
7879

@@ -111,8 +112,12 @@ public SyncedFlushService(IndicesService indicesService,
111112

112113
@Override
113114
public void onShardInactive(final IndexShard indexShard) {
114-
// we only want to call sync flush once, so only trigger it when we are on a primary
115-
if (indexShard.routingEntry().primary()) {
115+
// A normal flush has the same effect as a synced flush if all nodes are on 7.6 or later.
116+
final boolean preferNormalFlush = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0);
117+
if (preferNormalFlush) {
118+
performNormalFlushOnInactive(indexShard);
119+
} else if (indexShard.routingEntry().primary()) {
120+
// we only want to call sync flush once, so only trigger it when we are on a primary
116121
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
117122
@Override
118123
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
@@ -128,6 +133,23 @@ public void onFailure(Exception e) {
128133
}
129134
}
130135

136+
private void performNormalFlushOnInactive(IndexShard shard) {
137+
logger.debug("flushing shard {} on inactive", shard.routingEntry());
138+
shard.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
139+
@Override
140+
public void onFailure(Exception e) {
141+
if (shard.state() != IndexShardState.CLOSED) {
142+
logger.warn(new ParameterizedMessage("failed to flush shard {} on inactive", shard.routingEntry()), e);
143+
}
144+
}
145+
146+
@Override
147+
protected void doRun() {
148+
shard.flush(new FlushRequest().force(false).waitIfOngoing(false));
149+
}
150+
});
151+
}
152+
131153
/**
132154
* a utility method to perform a synced flush for all shards of multiple indices.
133155
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
@@ -137,7 +159,7 @@ public void attemptSyncedFlush(final String[] aliasesOrIndices,
137159
IndicesOptions indicesOptions,
138160
final ActionListener<SyncedFlushResponse> listener) {
139161
final ClusterState state = clusterService.state();
140-
if (StreamSupport.stream(state.nodes().spliterator(), false).allMatch(n -> n.getVersion().onOrAfter(Version.V_7_6_0))) {
162+
if (state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) {
141163
DEPRECATION_LOGGER.deprecatedAndMaybeLog("synced_flush", SYNCED_FLUSH_DEPRECATION_MESSAGE);
142164
}
143165
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,11 @@ public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
176176
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
177177
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
178178
assertBusy(() -> {
179-
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
180-
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
179+
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().setTranslog(true).get().getIndex("test");
180+
assertThat(indexStats.getTotal().translog.getUncommittedOperations(), equalTo(0));
181181
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
182182
}
183183
);
184-
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
185-
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
186184
}
187185

188186
public void testDurableFlagHasEffect() throws Exception {

server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

+38
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.unit.ByteSizeValue;
3838
import org.elasticsearch.common.xcontent.XContentType;
3939
import org.elasticsearch.index.Index;
40+
import org.elasticsearch.index.IndexService;
4041
import org.elasticsearch.index.IndexSettings;
4142
import org.elasticsearch.index.engine.Engine;
4243
import org.elasticsearch.index.engine.InternalEngine;
@@ -47,15 +48,22 @@
4748
import org.elasticsearch.index.shard.IndexShard;
4849
import org.elasticsearch.index.shard.IndexShardTestCase;
4950
import org.elasticsearch.index.shard.ShardId;
51+
import org.elasticsearch.indices.IndexingMemoryController;
5052
import org.elasticsearch.indices.IndicesService;
53+
import org.elasticsearch.plugins.Plugin;
5154
import org.elasticsearch.test.ESIntegTestCase;
55+
import org.elasticsearch.test.InternalSettingsPlugin;
56+
import org.elasticsearch.test.InternalTestCluster;
5257

5358
import java.io.IOException;
5459
import java.util.Arrays;
60+
import java.util.Collection;
61+
import java.util.Collections;
5562
import java.util.List;
5663
import java.util.Map;
5764
import java.util.concurrent.CopyOnWriteArrayList;
5865
import java.util.concurrent.CountDownLatch;
66+
import java.util.concurrent.TimeUnit;
5967
import java.util.concurrent.atomic.AtomicBoolean;
6068
import java.util.concurrent.atomic.AtomicInteger;
6169
import java.util.stream.Collectors;
@@ -71,6 +79,11 @@
7179
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
7280
public class FlushIT extends ESIntegTestCase {
7381

82+
@Override
83+
protected Collection<Class<? extends Plugin>> nodePlugins() {
84+
return Collections.singletonList(InternalSettingsPlugin.class);
85+
}
86+
7487
public void testWaitIfOngoing() throws InterruptedException {
7588
createIndex("test");
7689
ensureGreen("test");
@@ -369,4 +382,29 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
369382
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
370383
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
371384
}
385+
386+
public void testFlushOnInactive() throws Exception {
387+
final String indexName = "flush_on_inactive";
388+
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder()
389+
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(10, 1000, "ms")).build());
390+
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
391+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
392+
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(50, 200, "ms"))
393+
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
394+
.build()));
395+
ensureGreen(indexName);
396+
int numDocs = randomIntBetween(1, 10);
397+
for (int i = 0; i < numDocs; i++) {
398+
client().prepareIndex(indexName, "_doc").setSource("f", "v").get();
399+
}
400+
if (randomBoolean()) {
401+
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
402+
ensureGreen(indexName);
403+
}
404+
assertBusy(() -> {
405+
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
406+
assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0));
407+
}
408+
}, 30, TimeUnit.SECONDS);
409+
}
372410
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -2142,9 +2142,13 @@ public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
21422142
}
21432143

21442144
public List<String> startDataOnlyNodes(int numNodes) {
2145+
return startDataOnlyNodes(numNodes, Settings.EMPTY);
2146+
}
2147+
2148+
public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
21452149
return startNodes(
21462150
numNodes,
2147-
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
2151+
Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
21482152
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
21492153
}
21502154

0 commit comments

Comments
 (0)