@@ -148,8 +148,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
148
148
final Set <String > indices = bulkRequest .requests .stream ()
149
149
// delete requests should not attempt to create the index (if the index does not
150
150
// exists), unless an external versioning is used
151
- .filter (request -> request .opType () != DocWriteRequest .OpType .DELETE
152
- || request .versionType () == VersionType .EXTERNAL
151
+ .filter (request -> request .opType () != DocWriteRequest .OpType .DELETE
152
+ || request .versionType () == VersionType .EXTERNAL
153
153
|| request .versionType () == VersionType .EXTERNAL_GTE )
154
154
.map (DocWriteRequest ::index )
155
155
.collect (Collectors .toSet ());
@@ -189,7 +189,7 @@ public void onFailure(Exception e) {
189
189
if (!(ExceptionsHelper .unwrapCause (e ) instanceof ResourceAlreadyExistsException )) {
190
190
// fail all requests involving this index, if create didn't work
191
191
for (int i = 0 ; i < bulkRequest .requests .size (); i ++) {
192
- DocWriteRequest request = bulkRequest .requests .get (i );
192
+ DocWriteRequest <?> request = bulkRequest .requests .get (i );
193
193
if (request != null && setResponseFailureIfIndexMatches (responses , i , request , index , e )) {
194
194
bulkRequest .requests .set (i , null );
195
195
}
@@ -226,7 +226,7 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
226
226
client .admin ().indices ().create (createIndexRequest , listener );
227
227
}
228
228
229
- private boolean setResponseFailureIfIndexMatches (AtomicArray <BulkItemResponse > responses , int idx , DocWriteRequest request , String index , Exception e ) {
229
+ private boolean setResponseFailureIfIndexMatches (AtomicArray <BulkItemResponse > responses , int idx , DocWriteRequest <?> request , String index , Exception e ) {
230
230
if (index .equals (request .index ())) {
231
231
responses .set (idx , new BulkItemResponse (idx , request .opType (), new BulkItemResponse .Failure (request .index (), request .type (), request .id (), e )));
232
232
return true ;
@@ -276,7 +276,7 @@ protected void doRun() throws Exception {
276
276
final ConcreteIndices concreteIndices = new ConcreteIndices (clusterState , indexNameExpressionResolver );
277
277
MetaData metaData = clusterState .metaData ();
278
278
for (int i = 0 ; i < bulkRequest .requests .size (); i ++) {
279
- DocWriteRequest docWriteRequest = bulkRequest .requests .get (i );
279
+ DocWriteRequest <?> docWriteRequest = bulkRequest .requests .get (i );
280
280
//the request can only be null because we set it to null in the previous step, so it gets ignored
281
281
if (docWriteRequest == null ) {
282
282
continue ;
@@ -320,7 +320,7 @@ protected void doRun() throws Exception {
320
320
// first, go over all the requests and create a ShardId -> Operations mapping
321
321
Map <ShardId , List <BulkItemRequest >> requestsByShard = new HashMap <>();
322
322
for (int i = 0 ; i < bulkRequest .requests .size (); i ++) {
323
- DocWriteRequest request = bulkRequest .requests .get (i );
323
+ DocWriteRequest <?> request = bulkRequest .requests .get (i );
324
324
if (request == null ) {
325
325
continue ;
326
326
}
@@ -367,7 +367,7 @@ public void onFailure(Exception e) {
367
367
// create failures for all relevant requests
368
368
for (BulkItemRequest request : requests ) {
369
369
final String indexName = concreteIndices .getConcreteIndex (request .index ()).getName ();
370
- DocWriteRequest docWriteRequest = request .request ();
370
+ DocWriteRequest <?> docWriteRequest = request .request ();
371
371
responses .set (request .id (), new BulkItemResponse (request .id (), docWriteRequest .opType (),
372
372
new BulkItemResponse .Failure (indexName , docWriteRequest .type (), docWriteRequest .id (), e )));
373
373
}
@@ -423,7 +423,7 @@ public void onTimeout(TimeValue timeout) {
423
423
});
424
424
}
425
425
426
- private boolean addFailureIfIndexIsUnavailable (DocWriteRequest request , int idx , final ConcreteIndices concreteIndices ,
426
+ private boolean addFailureIfIndexIsUnavailable (DocWriteRequest <?> request , int idx , final ConcreteIndices concreteIndices ,
427
427
final MetaData metaData ) {
428
428
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated .get (request .index ());
429
429
if (cannotCreate != null ) {
@@ -447,7 +447,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx,
447
447
return false ;
448
448
}
449
449
450
- private void addFailure (DocWriteRequest request , int idx , Exception unavailableException ) {
450
+ private void addFailure (DocWriteRequest <?> request , int idx , Exception unavailableException ) {
451
451
BulkItemResponse .Failure failure = new BulkItemResponse .Failure (request .index (), request .type (), request .id (),
452
452
unavailableException );
453
453
BulkItemResponse bulkItemResponse = new BulkItemResponse (idx , request .opType (), failure );
@@ -476,7 +476,7 @@ Index getConcreteIndex(String indexOrAlias) {
476
476
return indices .get (indexOrAlias );
477
477
}
478
478
479
- Index resolveIfAbsent (DocWriteRequest request ) {
479
+ Index resolveIfAbsent (DocWriteRequest <?> request ) {
480
480
Index concreteIndex = indices .get (request .index ());
481
481
if (concreteIndex == null ) {
482
482
concreteIndex = indexNameExpressionResolver .concreteSingleIndex (state , request );
@@ -517,7 +517,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
517
517
});
518
518
}
519
519
520
- static final class BulkRequestModifier implements Iterator <DocWriteRequest > {
520
+ static final class BulkRequestModifier implements Iterator <DocWriteRequest <?> > {
521
521
522
522
final BulkRequest bulkRequest ;
523
523
final SparseFixedBitSet failedSlots ;
@@ -533,7 +533,7 @@ static final class BulkRequestModifier implements Iterator<DocWriteRequest> {
533
533
}
534
534
535
535
@ Override
536
- public DocWriteRequest next () {
536
+ public DocWriteRequest <?> next () {
537
537
return bulkRequest .requests ().get (++currentSlot );
538
538
}
539
539
@@ -552,10 +552,10 @@ BulkRequest getBulkRequest() {
552
552
modifiedBulkRequest .timeout (bulkRequest .timeout ());
553
553
554
554
int slot = 0 ;
555
- List <DocWriteRequest > requests = bulkRequest .requests ();
555
+ List <DocWriteRequest <?> > requests = bulkRequest .requests ();
556
556
originalSlots = new int [requests .size ()]; // oversize, but that's ok
557
557
for (int i = 0 ; i < requests .size (); i ++) {
558
- DocWriteRequest request = requests .get (i );
558
+ DocWriteRequest <?> request = requests .get (i );
559
559
if (failedSlots .get (i ) == false ) {
560
560
modifiedBulkRequest .add (request );
561
561
originalSlots [slot ++] = i ;
0 commit comments