Skip to content

Commit dd3cfd5

Browse files
committed
Synced-flush should not seal index of out of sync replicas (#28464)
Today the correctness of synced-flush is guaranteed by ensuring that there is no ongoing indexing operations on the primary. Unfortunately, a replica might fall out of sync with the primary even the condition is met. Moreover, if synced-flush mistakenly issues a sync_id for an out of sync replica, then that replica would not be able to recover from the primary. ES prevents that peer-recovery because it detects that both indexes from primary and replica were sealed with the same sync_id but have a different content. This commit modifies the synced-flush to not issue sync_id for out of sync replicas. This change will report the divergence issue earlier to users and also prevent replicas from getting into the "unrecoverable" state. Relates #10032
1 parent 690caf6 commit dd3cfd5

File tree

6 files changed

+145
-40
lines changed

6 files changed

+145
-40
lines changed

server/src/main/java/org/elasticsearch/index/engine/CommitStats.java

+7
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ public String getId() {
6969
return id;
7070
}
7171

72+
/**
73+
* A raw version of the commit id (see {@link SegmentInfos#getId()}
74+
*/
75+
public Engine.CommitId getRawCommitId() {
76+
return new Engine.CommitId(Base64.getDecoder().decode(id));
77+
}
78+
7279
/**
7380
* Returns the number of documents in the in this commit
7481
*/

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

+66-26
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.message.ParameterizedMessage;
2222
import org.apache.logging.log4j.util.Supplier;
2323
import org.elasticsearch.ElasticsearchException;
24+
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2627
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
@@ -44,6 +45,7 @@
4445
import org.elasticsearch.index.Index;
4546
import org.elasticsearch.index.IndexNotFoundException;
4647
import org.elasticsearch.index.IndexService;
48+
import org.elasticsearch.index.engine.CommitStats;
4749
import org.elasticsearch.index.engine.Engine;
4850
import org.elasticsearch.index.shard.IndexEventListener;
4951
import org.elasticsearch.index.shard.IndexShard;
@@ -199,10 +201,10 @@ private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState s
199201
return;
200202
}
201203

202-
final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() {
204+
final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener = new ActionListener<Map<String, PreSyncedFlushResponse>>() {
203205
@Override
204-
public void onResponse(final Map<String, Engine.CommitId> commitIds) {
205-
if (commitIds.isEmpty()) {
206+
public void onResponse(final Map<String, PreSyncedFlushResponse> presyncResponses) {
207+
if (presyncResponses.isEmpty()) {
206208
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
207209
return;
208210
}
@@ -216,7 +218,7 @@ public void onResponse(InFlightOpsResponse response) {
216218
} else {
217219
// 3. now send the sync request to all the shards
218220
String syncId = UUIDs.randomBase64UUID();
219-
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
221+
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
220222
}
221223
}
222224

@@ -236,7 +238,7 @@ public void onFailure(Exception e) {
236238
};
237239

238240
// 1. send pre-sync flushes to all replicas
239-
sendPreSyncRequests(activeShards, state, shardId, commitIdsListener);
241+
sendPreSyncRequests(activeShards, state, shardId, presyncListener);
240242
} catch (Exception e) {
241243
actionListener.onFailure(e);
242244
}
@@ -299,28 +301,49 @@ public String executor() {
299301
}
300302
}
301303

304+
private int numDocsOnPrimary(List<ShardRouting> shards, Map<String, PreSyncedFlushResponse> preSyncResponses) {
305+
for (ShardRouting shard : shards) {
306+
if (shard.primary()) {
307+
final PreSyncedFlushResponse resp = preSyncResponses.get(shard.currentNodeId());
308+
if (resp != null) {
309+
return resp.numDocs;
310+
}
311+
}
312+
}
313+
return PreSyncedFlushResponse.UNKNOWN_NUM_DOCS;
314+
}
302315

303-
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds,
316+
void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, PreSyncedFlushResponse> preSyncResponses,
304317
final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
305318
final CountDown countDown = new CountDown(shards.size());
306319
final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
320+
final int numDocsOnPrimary = numDocsOnPrimary(shards, preSyncResponses);
307321
for (final ShardRouting shard : shards) {
308322
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
309323
if (node == null) {
310324
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
311325
results.put(shard, new ShardSyncedFlushResponse("unknown node"));
312-
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
326+
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
313327
continue;
314328
}
315-
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
316-
if (expectedCommitId == null) {
329+
final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId());
330+
if (preSyncedResponse == null) {
317331
logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
318332
results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
319-
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
333+
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
334+
continue;
335+
}
336+
if (preSyncedResponse.numDocs != numDocsOnPrimary
337+
&& preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) {
338+
logger.warn("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]",
339+
shardId, syncId, shard, preSyncedResponse.numDocs, numDocsOnPrimary);
340+
results.put(shard, new ShardSyncedFlushResponse("out of sync replica; " +
341+
"num docs on replica [" + preSyncedResponse.numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]"));
342+
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
320343
continue;
321344
}
322345
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
323-
transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, expectedCommitId),
346+
transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId),
324347
new TransportResponseHandler<ShardSyncedFlushResponse>() {
325348
@Override
326349
public ShardSyncedFlushResponse newInstance() {
@@ -332,14 +355,14 @@ public void handleResponse(ShardSyncedFlushResponse response) {
332355
ShardSyncedFlushResponse existing = results.put(shard, response);
333356
assert existing == null : "got two answers for node [" + node + "]";
334357
// count after the assert so we won't decrement twice in handleException
335-
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
358+
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
336359
}
337360

338361
@Override
339362
public void handleException(TransportException exp) {
340363
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp);
341364
results.put(shard, new ShardSyncedFlushResponse(exp.getMessage()));
342-
contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
365+
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
343366
}
344367

345368
@Override
@@ -351,8 +374,8 @@ public String executor() {
351374

352375
}
353376

354-
private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
355-
ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
377+
private void countDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
378+
ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
356379
if (countDown.countDown()) {
357380
assert results.size() == shards.size();
358381
listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
@@ -362,16 +385,16 @@ private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> sha
362385
/**
363386
* send presync requests to all started copies of the given shard
364387
*/
365-
void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, Engine.CommitId>> listener) {
388+
void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, PreSyncedFlushResponse>> listener) {
366389
final CountDown countDown = new CountDown(shards.size());
367-
final ConcurrentMap<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
390+
final ConcurrentMap<String, PreSyncedFlushResponse> presyncResponses = ConcurrentCollections.newConcurrentMap();
368391
for (final ShardRouting shard : shards) {
369392
logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
370393
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
371394
if (node == null) {
372395
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
373396
if (countDown.countDown()) {
374-
listener.onResponse(commitIds);
397+
listener.onResponse(presyncResponses);
375398
}
376399
continue;
377400
}
@@ -383,19 +406,19 @@ public PreSyncedFlushResponse newInstance() {
383406

384407
@Override
385408
public void handleResponse(PreSyncedFlushResponse response) {
386-
Engine.CommitId existing = commitIds.putIfAbsent(node.getId(), response.commitId());
409+
PreSyncedFlushResponse existing = presyncResponses.putIfAbsent(node.getId(), response);
387410
assert existing == null : "got two answers for node [" + node + "]";
388411
// count after the assert so we won't decrement twice in handleException
389412
if (countDown.countDown()) {
390-
listener.onResponse(commitIds);
413+
listener.onResponse(presyncResponses);
391414
}
392415
}
393416

394417
@Override
395418
public void handleException(TransportException exp) {
396419
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp);
397420
if (countDown.countDown()) {
398-
listener.onResponse(commitIds);
421+
listener.onResponse(presyncResponses);
399422
}
400423
}
401424

@@ -411,9 +434,11 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
411434
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
412435
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
413436
logger.trace("{} performing pre sync flush", request.shardId());
414-
Engine.CommitId commitId = indexShard.flush(flushRequest);
415-
logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId);
416-
return new PreSyncedFlushResponse(commitId);
437+
indexShard.flush(flushRequest);
438+
final CommitStats commitStats = indexShard.commitStats();
439+
final Engine.CommitId commitId = commitStats.getRawCommitId();
440+
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
441+
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs());
417442
}
418443

419444
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
@@ -483,30 +508,45 @@ public ShardId shardId() {
483508
* Response for first step of synced flush (flush) for one shard copy
484509
*/
485510
static final class PreSyncedFlushResponse extends TransportResponse {
511+
static final int UNKNOWN_NUM_DOCS = -1;
486512

487513
Engine.CommitId commitId;
514+
int numDocs;
488515

489516
PreSyncedFlushResponse() {
490517
}
491518

492-
PreSyncedFlushResponse(Engine.CommitId commitId) {
519+
PreSyncedFlushResponse(Engine.CommitId commitId, int numDocs) {
493520
this.commitId = commitId;
521+
this.numDocs = numDocs;
494522
}
495523

496-
public Engine.CommitId commitId() {
524+
Engine.CommitId commitId() {
497525
return commitId;
498526
}
499527

528+
int numDocs() {
529+
return numDocs;
530+
}
531+
500532
@Override
501533
public void readFrom(StreamInput in) throws IOException {
502534
super.readFrom(in);
503535
commitId = new Engine.CommitId(in);
536+
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
537+
numDocs = in.readInt();
538+
} else {
539+
numDocs = UNKNOWN_NUM_DOCS;
540+
}
504541
}
505542

506543
@Override
507544
public void writeTo(StreamOutput out) throws IOException {
508545
super.writeTo(out);
509546
commitId.writeTo(out);
547+
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
548+
out.writeInt(numDocs);
549+
}
510550
}
511551
}
512552

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -602,9 +602,10 @@ public long getCheckpoint() {
602602
globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ?
603603
SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get()));
604604

605-
engine.flush(true, true);
605+
final Engine.CommitId commitId = engine.flush(true, true);
606606

607607
CommitStats stats2 = engine.commitStats();
608+
assertThat(stats2.getRawCommitId(), equalTo(commitId));
608609
assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration()));
609610
assertThat(stats2.getId(), notNullValue());
610611
assertThat(stats2.getId(), not(equalTo(stats1.getId())));

server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java

+56
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.indices.flush;
2020

21+
import org.apache.lucene.index.Term;
2122
import org.elasticsearch.action.ActionListener;
2223
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
2324
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
@@ -35,7 +36,13 @@
3536
import org.elasticsearch.index.Index;
3637
import org.elasticsearch.index.IndexSettings;
3738
import org.elasticsearch.index.engine.Engine;
39+
import org.elasticsearch.index.engine.InternalEngineTests;
40+
import org.elasticsearch.index.mapper.ParsedDocument;
41+
import org.elasticsearch.index.mapper.Uid;
42+
import org.elasticsearch.index.shard.IndexShard;
43+
import org.elasticsearch.index.shard.IndexShardTestCase;
3844
import org.elasticsearch.index.shard.ShardId;
45+
import org.elasticsearch.indices.IndicesService;
3946
import org.elasticsearch.test.ESIntegTestCase;
4047

4148
import java.io.IOException;
@@ -47,9 +54,12 @@
4754
import java.util.concurrent.ExecutionException;
4855
import java.util.concurrent.atomic.AtomicBoolean;
4956
import java.util.concurrent.atomic.AtomicInteger;
57+
import java.util.stream.Collectors;
5058

59+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5160
import static org.hamcrest.Matchers.emptyIterable;
5261
import static org.hamcrest.Matchers.equalTo;
62+
import static org.hamcrest.Matchers.nullValue;
5363

5464
public class FlushIT extends ESIntegTestCase {
5565
public void testWaitIfOngoing() throws InterruptedException {
@@ -224,4 +234,50 @@ public void testUnallocatedShardsDoesNotHang() throws InterruptedException {
224234
assertThat(shardsResult.size(), equalTo(numShards));
225235
assertThat(shardsResult.get(0).failureReason(), equalTo("no active shards"));
226236
}
237+
238+
private void indexDoc(Engine engine, String id) throws IOException {
239+
final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null);
240+
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc));
241+
assertThat(indexResult.getFailure(), nullValue());
242+
}
243+
244+
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
245+
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
246+
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
247+
assertAcked(
248+
prepareCreate("test").setSettings(Settings.builder()
249+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
250+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)).get()
251+
);
252+
ensureGreen();
253+
final Index index = clusterService().state().metaData().index("test").getIndex();
254+
final ShardId shardId = new ShardId(index, 0);
255+
final int numDocs = between(1, 10);
256+
for (int i = 0; i < numDocs; i++) {
257+
index("test", "doc", Integer.toString(i));
258+
}
259+
final List<IndexShard> indexShards = internalCluster().nodesInclude("test").stream()
260+
.map(node -> internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId))
261+
.collect(Collectors.toList());
262+
// Index extra documents to one replica - synced-flush should fail on that replica.
263+
final IndexShard outOfSyncReplica = randomValueOtherThanMany(s -> s.routingEntry().primary(), () -> randomFrom(indexShards));
264+
final int extraDocs = between(1, 10);
265+
for (int i = 0; i < extraDocs; i++) {
266+
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
267+
}
268+
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
269+
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
270+
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
271+
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
272+
"out of sync replica; num docs on replica [" + (numDocs + extraDocs) + "]; num docs on primary [" + numDocs + "]"));
273+
// Index extra documents to all shards - synced-flush should be ok.
274+
for (IndexShard indexShard : indexShards) {
275+
for (int i = 0; i < extraDocs; i++) {
276+
indexDoc(IndexShardTestCase.getEngine(indexShard), "extra_" + i);
277+
}
278+
}
279+
final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
280+
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
281+
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
282+
}
227283
}

0 commit comments

Comments
 (0)