Skip to content

Commit c4de75b

Browse files
committed
Do not renew sync-id if all shards are sealed (#29103)
Today the synced-flush always issues a new sync-id even though all shards haven't been changed since the last seal. This causes active shards to have different a sync-id from offline shards even though all were sealed and no writes since then. This commit adjusts not to renew sync-id if all active shards are sealed with the same sync-id. Closes #27838
1 parent b4711b9 commit c4de75b

File tree

3 files changed

+114
-13
lines changed

3 files changed

+114
-13
lines changed

server/src/main/java/org/elasticsearch/index/engine/CommitStats.java

+7
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ public Engine.CommitId getRawCommitId() {
7676
return new Engine.CommitId(Base64.getDecoder().decode(id));
7777
}
7878

79+
/**
80+
* The synced-flush id of the commit if existed.
81+
*/
82+
public String syncId() {
83+
return userData.get(InternalEngine.SYNC_COMMIT_ID);
84+
}
85+
7986
/**
8087
* Returns the number of documents in the in this commit
8188
*/

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

+58-13
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
3636
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.Nullable;
38+
import org.elasticsearch.common.Strings;
3739
import org.elasticsearch.common.UUIDs;
3840
import org.elasticsearch.common.component.AbstractComponent;
3941
import org.elasticsearch.common.inject.Inject;
@@ -65,6 +67,7 @@
6567
import java.io.IOException;
6668
import java.util.ArrayList;
6769
import java.util.Collections;
70+
import java.util.HashMap;
6871
import java.util.List;
6972
import java.util.Map;
7073
import java.util.concurrent.ConcurrentMap;
@@ -216,9 +219,16 @@ public void onResponse(InFlightOpsResponse response) {
216219
if (inflight != 0) {
217220
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
218221
} else {
219-
// 3. now send the sync request to all the shards
220-
String syncId = UUIDs.randomBase64UUID();
221-
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
222+
// 3. now send the sync request to all the shards;
223+
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
224+
if (sharedSyncId != null) {
225+
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
226+
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
227+
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
228+
}else {
229+
String syncId = UUIDs.randomBase64UUID();
230+
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
231+
}
222232
}
223233
}
224234

@@ -244,6 +254,33 @@ public void onFailure(Exception e) {
244254
}
245255
}
246256

257+
private String sharedExistingSyncId(Map<String, PreSyncedFlushResponse> preSyncedFlushResponses) {
258+
String existingSyncId = null;
259+
for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) {
260+
if (Strings.isNullOrEmpty(resp.existingSyncId)) {
261+
return null;
262+
}
263+
if (existingSyncId == null) {
264+
existingSyncId = resp.existingSyncId;
265+
}
266+
if (existingSyncId.equals(resp.existingSyncId) == false) {
267+
return null;
268+
}
269+
}
270+
return existingSyncId;
271+
}
272+
273+
private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
274+
Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
275+
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
276+
for (final ShardRouting shard : shards) {
277+
if (preSyncResponses.containsKey(shard.currentNodeId())) {
278+
results.put(shard, new ShardSyncedFlushResponse());
279+
}
280+
}
281+
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
282+
}
283+
247284
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
248285
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
249286
if (indexRoutingTable == null) {
@@ -438,7 +475,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
438475
final CommitStats commitStats = indexShard.commitStats();
439476
final Engine.CommitId commitId = commitStats.getRawCommitId();
440477
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
441-
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
478+
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
442479
}
443480

444481
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
@@ -512,21 +549,15 @@ static final class PreSyncedFlushResponse extends TransportResponse {
512549

513550
Engine.CommitId commitId;
514551
int numDocs;
552+
@Nullable String existingSyncId = null;
515553

516554
PreSyncedFlushResponse() {
517555
}
518556

519-
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
557+
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
520558
this.commitId = commitId;
521559
this.numDocs = numDocs;
522-
}
523-
524-
Engine.CommitId commitId() {
525-
return commitId;
526-
}
527-
528-
int numDocs() {
529-
return numDocs;
560+
this.existingSyncId = existingSyncId;
530561
}
531562

532563
boolean includeNumDocs(Version version) {
@@ -537,6 +568,14 @@ boolean includeNumDocs(Version version) {
537568
}
538569
}
539570

571+
boolean includeExistingSyncId(Version version) {
572+
if (version.major == Version.V_5_6_9.major) {
573+
return version.onOrAfter(Version.V_5_6_9);
574+
} else {
575+
return version.onOrAfter(Version.V_6_3_0);
576+
}
577+
}
578+
540579
@Override
541580
public void readFrom(StreamInput in) throws IOException {
542581
super.readFrom(in);
@@ -546,6 +585,9 @@ public void readFrom(StreamInput in) throws IOException {
546585
} else {
547586
numDocs = UNKNOWN_NUM_DOCS;
548587
}
588+
if (includeExistingSyncId(in.getVersion())) {
589+
existingSyncId = in.readOptionalString();
590+
}
549591
}
550592

551593
@Override
@@ -555,6 +597,9 @@ public void writeTo(StreamOutput out) throws IOException {
555597
if (includeNumDocs(out.getVersion())) {
556598
out.writeInt(numDocs);
557599
}
600+
if (includeExistingSyncId(out.getVersion())) {
601+
out.writeOptionalString(existingSyncId);
602+
}
558603
}
559604
}
560605

server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

+49
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.lucene.index.Term;
2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2324
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
2425
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
2526
import org.elasticsearch.action.admin.indices.stats.IndexStats;
@@ -29,6 +30,7 @@
2930
import org.elasticsearch.cluster.metadata.IndexMetaData;
3031
import org.elasticsearch.cluster.routing.ShardRouting;
3132
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
33+
import org.elasticsearch.common.UUIDs;
3234
import org.elasticsearch.common.settings.Settings;
3335
import org.elasticsearch.common.unit.ByteSizeUnit;
3436
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -59,6 +61,7 @@
5961
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6062
import static org.hamcrest.Matchers.emptyIterable;
6163
import static org.hamcrest.Matchers.equalTo;
64+
import static org.hamcrest.Matchers.not;
6265
import static org.hamcrest.Matchers.nullValue;
6366

6467
public class FlushIT extends ESIntegTestCase {
@@ -280,4 +283,50 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
280283
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
281284
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
282285
}
286+
287+
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
288+
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
289+
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
290+
assertAcked(
291+
prepareCreate("test").setSettings(Settings.builder()
292+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
293+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
294+
);
295+
ensureGreen();
296+
final Index index = clusterService().state().metaData().index("test").getIndex();
297+
final ShardId shardId = new ShardId(index, 0);
298+
final int numDocs = between(1, 10);
299+
for (int i = 0; i < numDocs; i++) {
300+
index("test", "doc", Integer.toString(i));
301+
}
302+
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
303+
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
304+
// Do not renew synced-flush
305+
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
306+
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
307+
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
308+
// Shards were updated, renew synced flush.
309+
final int moreDocs = between(1, 10);
310+
for (int i = 0; i < moreDocs; i++) {
311+
index("test", "doc", Integer.toString(i));
312+
}
313+
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
314+
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
315+
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
316+
// Manually remove or change sync-id, renew synced flush.
317+
IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test")))
318+
.getShardOrNull(shardId);
319+
if (randomBoolean()) {
320+
// Change the existing sync-id of a single shard.
321+
shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId());
322+
assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId())));
323+
} else {
324+
// Flush will create a new commit without sync-id
325+
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
326+
assertThat(shard.commitStats().syncId(), nullValue());
327+
}
328+
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
329+
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
330+
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
331+
}
283332
}

0 commit comments

Comments
 (0)