76
76
import org .elasticsearch .test .DummyShardLock ;
77
77
import org .elasticsearch .test .ESTestCase ;
78
78
import org .elasticsearch .test .IndexSettingsModule ;
79
- import org .mockito .ArgumentCaptor ;
80
79
81
80
import java .io .IOException ;
82
81
import java .io .OutputStream ;
108
107
import static org .mockito .Matchers .anyString ;
109
108
import static org .mockito .Mockito .doAnswer ;
110
109
import static org .mockito .Mockito .mock ;
111
- import static org .mockito .Mockito .verify ;
112
110
import static org .mockito .Mockito .when ;
113
111
114
112
public class RecoverySourceHandlerTests extends ESTestCase {
@@ -205,9 +203,6 @@ public void testSendSnapshotSendsOps() throws IOException {
205
203
final StartRecoveryRequest request = getStartRecoveryRequest ();
206
204
final IndexShard shard = mock (IndexShard .class );
207
205
when (shard .state ()).thenReturn (IndexShardState .STARTED );
208
- final RecoveryTargetHandler recoveryTarget = mock (RecoveryTargetHandler .class );
209
- final RecoverySourceHandler handler =
210
- new RecoverySourceHandler (shard , recoveryTarget , request , fileChunkSizeInBytes , between (1 , 10 ));
211
206
final List <Translog .Operation > operations = new ArrayList <>();
212
207
final int initialNumberOfDocs = randomIntBetween (16 , 64 );
213
208
for (int i = 0 ; i < initialNumberOfDocs ; i ++) {
@@ -219,38 +214,23 @@ public void testSendSnapshotSendsOps() throws IOException {
219
214
final Engine .Index index = getIndex (Integer .toString (i ));
220
215
operations .add (new Translog .Index (index , new Engine .IndexResult (1 , 1 , i - initialNumberOfDocs , true )));
221
216
}
222
- operations .add (null );
223
217
final long startingSeqNo = randomIntBetween (0 , numberOfDocsWithValidSequenceNumbers - 1 );
224
218
final long requiredStartingSeqNo = randomIntBetween ((int ) startingSeqNo , numberOfDocsWithValidSequenceNumbers - 1 );
225
219
final long endingSeqNo = randomIntBetween ((int ) requiredStartingSeqNo - 1 , numberOfDocsWithValidSequenceNumbers - 1 );
226
- RecoverySourceHandler .SendSnapshotResult result = handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
227
- endingSeqNo , new Translog .Snapshot () {
228
- @ Override
229
- public void close () {
230
-
231
- }
232
-
233
- private int counter = 0 ;
234
220
235
- @ Override
236
- public int totalOperations () {
237
- return operations .size () - 1 ;
238
- }
239
-
240
- @ Override
241
- public Translog .Operation next () throws IOException {
242
- return operations .get (counter ++);
243
- }
244
- }, randomNonNegativeLong (), randomNonNegativeLong ());
221
+ final List <Translog .Operation > shippedOps = new ArrayList <>();
222
+ RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler () {
223
+ @ Override
224
+ public long indexTranslogOperations (List <Translog .Operation > operations , int totalTranslogOps , long timestamp , long msu ) {
225
+ shippedOps .addAll (operations );
226
+ return SequenceNumbers .NO_OPS_PERFORMED ;
227
+ }
228
+ };
229
+ RecoverySourceHandler handler = new RecoverySourceHandler (shard , recoveryTarget , request , fileChunkSizeInBytes , between (1 , 10 ));
230
+ RecoverySourceHandler .SendSnapshotResult result = handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
231
+ endingSeqNo , newTranslogSnapshot (operations , Collections .emptyList ()), randomNonNegativeLong (), randomNonNegativeLong ());
245
232
final int expectedOps = (int ) (endingSeqNo - startingSeqNo + 1 );
246
233
assertThat (result .totalOperations , equalTo (expectedOps ));
247
- final ArgumentCaptor <List > shippedOpsCaptor = ArgumentCaptor .forClass (List .class );
248
- verify (recoveryTarget ).indexTranslogOperations (shippedOpsCaptor .capture (), ArgumentCaptor .forClass (Integer .class ).capture (),
249
- ArgumentCaptor .forClass (Long .class ).capture (), ArgumentCaptor .forClass (Long .class ).capture ());
250
- List <Translog .Operation > shippedOps = new ArrayList <>();
251
- for (List list : shippedOpsCaptor .getAllValues ()) {
252
- shippedOps .addAll (list );
253
- }
254
234
shippedOps .sort (Comparator .comparing (Translog .Operation ::seqNo ));
255
235
assertThat (shippedOps .size (), equalTo (expectedOps ));
256
236
for (int i = 0 ; i < shippedOps .size (); i ++) {
@@ -261,30 +241,8 @@ public Translog.Operation next() throws IOException {
261
241
List <Translog .Operation > requiredOps = operations .subList (0 , operations .size () - 1 ).stream () // remove last null marker
262
242
.filter (o -> o .seqNo () >= requiredStartingSeqNo && o .seqNo () <= endingSeqNo ).collect (Collectors .toList ());
263
243
List <Translog .Operation > opsToSkip = randomSubsetOf (randomIntBetween (1 , requiredOps .size ()), requiredOps );
264
- expectThrows (IllegalStateException .class , () ->
265
- handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
266
- endingSeqNo , new Translog .Snapshot () {
267
- @ Override
268
- public void close () {
269
-
270
- }
271
-
272
- private int counter = 0 ;
273
-
274
- @ Override
275
- public int totalOperations () {
276
- return operations .size () - 1 - opsToSkip .size ();
277
- }
278
-
279
- @ Override
280
- public Translog .Operation next () throws IOException {
281
- Translog .Operation op ;
282
- do {
283
- op = operations .get (counter ++);
284
- } while (op != null && opsToSkip .contains (op ));
285
- return op ;
286
- }
287
- }, randomNonNegativeLong (), randomNonNegativeLong ()));
244
+ expectThrows (IllegalStateException .class , () -> handler .phase2 (startingSeqNo , requiredStartingSeqNo ,
245
+ endingSeqNo , newTranslogSnapshot (operations , opsToSkip ), randomNonNegativeLong (), randomNonNegativeLong ()));
288
246
}
289
247
}
290
248
@@ -716,4 +674,39 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
716
674
int totalTranslogOps , ActionListener <Void > listener ) {
717
675
}
718
676
}
677
+
678
+ private Translog .Snapshot newTranslogSnapshot (List <Translog .Operation > operations , List <Translog .Operation > operationsToSkip ) {
679
+ return new Translog .Snapshot () {
680
+ int index = 0 ;
681
+ int skippedCount = 0 ;
682
+
683
+ @ Override
684
+ public int totalOperations () {
685
+ return operations .size ();
686
+ }
687
+
688
+ @ Override
689
+ public int skippedOperations () {
690
+ return skippedCount ;
691
+ }
692
+
693
+ @ Override
694
+ public Translog .Operation next () {
695
+ while (index < operations .size ()) {
696
+ Translog .Operation op = operations .get (index ++);
697
+ if (operationsToSkip .contains (op )) {
698
+ skippedCount ++;
699
+ } else {
700
+ return op ;
701
+ }
702
+ }
703
+ return null ;
704
+ }
705
+
706
+ @ Override
707
+ public void close () {
708
+
709
+ }
710
+ };
711
+ }
719
712
}
0 commit comments