Skip to content

Commit ec4db0f

Browse files
committed
Revert "Do not renew sync-id if all shards are sealed (#29103)"
This reverts commit 25b4d9e.
1 parent 93370aa commit ec4db0f

File tree

3 files changed

+13
-115
lines changed

3 files changed

+13
-115
lines changed

core/src/main/java/org/elasticsearch/index/engine/CommitStats.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,6 @@ public Engine.CommitId getRawCommitId() {
7676
return new Engine.CommitId(Base64.getDecoder().decode(id));
7777
}
7878

79-
/**
80-
* The synced-flush id of the commit if existed.
81-
*/
82-
public String syncId() {
83-
return userData.get(InternalEngine.SYNC_COMMIT_ID);
84-
}
85-
8679
/**
8780
* Returns the number of documents in the in this commit
8881
*/

core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

Lines changed: 13 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3535
import org.elasticsearch.cluster.routing.ShardRouting;
3636
import org.elasticsearch.cluster.service.ClusterService;
37-
import org.elasticsearch.common.Nullable;
38-
import org.elasticsearch.common.Strings;
3937
import org.elasticsearch.common.UUIDs;
4038
import org.elasticsearch.common.component.AbstractComponent;
4139
import org.elasticsearch.common.inject.Inject;
@@ -67,7 +65,6 @@
6765
import java.io.IOException;
6866
import java.util.ArrayList;
6967
import java.util.Collections;
70-
import java.util.HashMap;
7168
import java.util.List;
7269
import java.util.Map;
7370
import java.util.concurrent.ConcurrentMap;
@@ -219,16 +216,9 @@ public void onResponse(InFlightOpsResponse response) {
219216
if (inflight != 0) {
220217
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
221218
} else {
222-
// 3. now send the sync request to all the shards;
223-
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
224-
if (sharedSyncId != null) {
225-
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
226-
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
227-
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
228-
}else {
229-
String syncId = UUIDs.base64UUID();
230-
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
231-
}
219+
// 3. now send the sync request to all the shards
220+
String syncId = UUIDs.base64UUID();
221+
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
232222
}
233223
}
234224

@@ -254,33 +244,6 @@ public void onFailure(Exception e) {
254244
}
255245
}
256246

257-
private String sharedExistingSyncId(Map<String, PreSyncedFlushResponse> preSyncedFlushResponses) {
258-
String existingSyncId = null;
259-
for (PreSyncedFlushResponse resp : preSyncedFlushResponses.values()) {
260-
if (Strings.isNullOrEmpty(resp.existingSyncId)) {
261-
return null;
262-
}
263-
if (existingSyncId == null) {
264-
existingSyncId = resp.existingSyncId;
265-
}
266-
if (existingSyncId.equals(resp.existingSyncId) == false) {
267-
return null;
268-
}
269-
}
270-
return existingSyncId;
271-
}
272-
273-
private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
274-
Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
275-
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
276-
for (final ShardRouting shard : shards) {
277-
if (preSyncResponses.containsKey(shard.currentNodeId())) {
278-
results.put(shard, new ShardSyncedFlushResponse());
279-
}
280-
}
281-
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
282-
}
283-
284247
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
285248
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
286249
if (indexRoutingTable == null) {
@@ -475,7 +438,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
475438
final CommitStats commitStats = indexShard.commitStats();
476439
final Engine.CommitId commitId = commitStats.getRawCommitId();
477440
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
478-
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
441+
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
479442
}
480443

481444
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
@@ -547,19 +510,24 @@ public ShardId shardId() {
547510
static final class PreSyncedFlushResponse extends TransportResponse {
548511
static final int UNKNOWN_NUM_DOCS = -1;
549512
public static final int V_6_2_2_ID = 6020299;
550-
public static final int V_6_3_0_ID = 6030099;
551513

552514
Engine.CommitId commitId;
553515
int numDocs;
554-
@Nullable String existingSyncId = null;
555516

556517
PreSyncedFlushResponse() {
557518
}
558519

559-
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs, String existingSyncId) {
520+
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
560521
this.commitId = commitId;
561522
this.numDocs = numDocs;
562-
this.existingSyncId = existingSyncId;
523+
}
524+
525+
Engine.CommitId commitId() {
526+
return commitId;
527+
}
528+
529+
int numDocs() {
530+
return numDocs;
563531
}
564532

565533
boolean includeNumDocs(Version version) {
@@ -570,14 +538,6 @@ boolean includeNumDocs(Version version) {
570538
}
571539
}
572540

573-
boolean includeExistingSyncId(Version version) {
574-
if (version.major == Version.V_5_6_9_UNRELEASED.major) {
575-
return version.onOrAfter(Version.V_5_6_9_UNRELEASED);
576-
} else {
577-
return version.id >= V_6_3_0_ID;
578-
}
579-
}
580-
581541
@Override
582542
public void readFrom(StreamInput in) throws IOException {
583543
super.readFrom(in);
@@ -587,9 +547,6 @@ public void readFrom(StreamInput in) throws IOException {
587547
} else {
588548
numDocs = UNKNOWN_NUM_DOCS;
589549
}
590-
if (includeExistingSyncId(in.getVersion())) {
591-
existingSyncId = in.readOptionalString();
592-
}
593550
}
594551

595552
@Override
@@ -599,9 +556,6 @@ public void writeTo(StreamOutput out) throws IOException {
599556
if (includeNumDocs(out.getVersion())) {
600557
out.writeInt(numDocs);
601558
}
602-
if (includeExistingSyncId(out.getVersion())) {
603-
out.writeOptionalString(existingSyncId);
604-
}
605559
}
606560
}
607561

core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.lucene.index.Term;
2222
import org.elasticsearch.action.ActionListener;
23-
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2423
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
2524
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
2625
import org.elasticsearch.action.admin.indices.stats.IndexStats;
@@ -30,7 +29,6 @@
3029
import org.elasticsearch.cluster.metadata.IndexMetaData;
3130
import org.elasticsearch.cluster.routing.ShardRouting;
3231
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
33-
import org.elasticsearch.common.UUIDs;
3432
import org.elasticsearch.common.settings.Settings;
3533
import org.elasticsearch.common.unit.ByteSizeUnit;
3634
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -61,7 +59,6 @@
6159
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6260
import static org.hamcrest.Matchers.emptyIterable;
6361
import static org.hamcrest.Matchers.equalTo;
64-
import static org.hamcrest.Matchers.not;
6562
import static org.hamcrest.Matchers.nullValue;
6663

6764
public class FlushIT extends ESIntegTestCase {
@@ -284,50 +281,4 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
284281
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
285282
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
286283
}
287-
288-
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
289-
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
290-
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
291-
assertAcked(
292-
prepareCreate("test").setSettings(Settings.builder()
293-
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
294-
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
295-
);
296-
ensureGreen();
297-
final Index index = clusterService().state().metaData().index("test").getIndex();
298-
final ShardId shardId = new ShardId(index, 0);
299-
final int numDocs = between(1, 10);
300-
for (int i = 0; i < numDocs; i++) {
301-
index("test", "doc", Integer.toString(i));
302-
}
303-
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
304-
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
305-
// Do not renew synced-flush
306-
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
307-
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
308-
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
309-
// Shards were updated, renew synced flush.
310-
final int moreDocs = between(1, 10);
311-
for (int i = 0; i < moreDocs; i++) {
312-
index("test", "doc", Integer.toString(i));
313-
}
314-
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
315-
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
316-
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
317-
// Manually remove or change sync-id, renew synced flush.
318-
IndexShard shard = internalCluster().getInstance(IndicesService.class, randomFrom(internalCluster().nodesInclude("test")))
319-
.getShardOrNull(shardId);
320-
if (randomBoolean()) {
321-
// Change the existing sync-id of a single shard.
322-
shard.syncFlush(UUIDs.randomBase64UUID(random()), shard.commitStats().getRawCommitId());
323-
assertThat(shard.commitStats().syncId(), not(equalTo(thirdSeal.syncId())));
324-
} else {
325-
// Flush will create a new commit without sync-id
326-
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
327-
assertThat(shard.commitStats().syncId(), nullValue());
328-
}
329-
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
330-
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
331-
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
332-
}
333284
}

0 commit comments

Comments
 (0)