Skip to content

Commit c812ad5

Browse files
committed
Primary replica resync should not send ops without seqno (#40433)
Primary-replica resync in a mixed-cluster between 6.x and 5.6 can send operations without sequence number to a replica which already processed operations with sequence number. This leads to the failure of that replica for we trip the sequence number assertion when writing resync operations without sequence number to translog.
1 parent 11357bb commit c812ad5

File tree

3 files changed

+61
-3
lines changed

3 files changed

+61
-3
lines changed

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,11 @@ protected void doRun() throws Exception {
246246
Translog.Operation operation;
247247
while ((operation = snapshot.next()) != null) {
248248
final long seqNo = operation.seqNo();
249-
if (startingSeqNo >= 0 &&
250-
(seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo)) {
249+
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) {
251250
totalSkippedOps.incrementAndGet();
252251
continue;
253252
}
253+
assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]";
254254
operations.add(operation);
255255
size += operation.estimateSize();
256256
totalSentOps.incrementAndGet();
@@ -260,7 +260,6 @@ protected void doRun() throws Exception {
260260
break;
261261
}
262262
}
263-
264263
final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO;
265264
// have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least
266265
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

+34
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,30 @@
4242
import org.elasticsearch.index.VersionType;
4343
import org.elasticsearch.index.mapper.SourceToParse;
4444
import org.elasticsearch.index.seqno.SequenceNumbers;
45+
import org.elasticsearch.index.translog.TestTranslog;
46+
import org.elasticsearch.index.translog.Translog;
4547
import org.elasticsearch.tasks.Task;
4648
import org.elasticsearch.tasks.TaskManager;
4749

4850
import java.io.IOException;
4951
import java.nio.ByteBuffer;
5052
import java.util.ArrayList;
53+
import java.util.Arrays;
5154
import java.util.Collections;
5255
import java.util.List;
5356
import java.util.concurrent.CountDownLatch;
5457
import java.util.concurrent.atomic.AtomicBoolean;
58+
import java.util.stream.Collectors;
5559

5660
import static org.hamcrest.Matchers.containsString;
5761
import static org.hamcrest.Matchers.equalTo;
5862
import static org.hamcrest.Matchers.is;
5963
import static org.hamcrest.core.IsInstanceOf.instanceOf;
64+
import static org.mockito.Matchers.anyLong;
65+
import static org.mockito.Matchers.anyString;
66+
import static org.mockito.Mockito.doReturn;
67+
import static org.mockito.Mockito.spy;
68+
import static org.mockito.Mockito.when;
6069

6170
public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
6271

@@ -186,6 +195,31 @@ public void onResponse(PrimaryReplicaSyncer.ResyncTask result) {
186195
}
187196
}
188197

198+
public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception {
199+
IndexShard shard = spy(newStartedShard(true));
200+
when(shard.getGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO);
201+
int numOps = between(0, 20);
202+
List<Translog.Operation> operations = new ArrayList<>();
203+
for (int i = 0; i < numOps; i++) {
204+
operations.add(new Translog.Index(
205+
"_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
206+
}
207+
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
208+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
209+
List<Translog.Operation> sentOperations = new ArrayList<>();
210+
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {
211+
sentOperations.addAll(Arrays.asList(request.getOperations()));
212+
listener.onResponse(new ResyncReplicationResponse());
213+
};
214+
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
215+
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));
216+
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
217+
syncer.resync(shard, fut);
218+
fut.actionGet();
219+
assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList())));
220+
closeShards(shard);
221+
}
222+
189223
public void testStatusSerialization() throws IOException {
190224
PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
191225
randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));

server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java

+25
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.ArrayList;
3838
import java.util.Collection;
3939
import java.util.Comparator;
40+
import java.util.Iterator;
4041
import java.util.List;
4142
import java.util.Random;
4243
import java.util.Set;
@@ -142,4 +143,28 @@ public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot,
142143
}
143144
return ops;
144145
}
146+
147+
public static Translog.Snapshot newSnapshotFromOperations(List<Translog.Operation> operations) {
148+
final Iterator<Translog.Operation> iterator = operations.iterator();
149+
return new Translog.Snapshot() {
150+
@Override
151+
public int totalOperations() {
152+
return operations.size();
153+
}
154+
155+
@Override
156+
public Translog.Operation next() {
157+
if (iterator.hasNext()) {
158+
return iterator.next();
159+
} else {
160+
return null;
161+
}
162+
}
163+
164+
@Override
165+
public void close() {
166+
167+
}
168+
};
169+
}
145170
}

0 commit comments

Comments
 (0)