@@ -256,9 +256,11 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
256
256
client .admin ().indices ().create (createIndexRequest , listener );
257
257
}
258
258
259
- private boolean setResponseFailureIfIndexMatches (AtomicArray <BulkItemResponse > responses , int idx , DocWriteRequest <?> request , String index , Exception e ) {
259
+ private boolean setResponseFailureIfIndexMatches (AtomicArray <BulkItemResponse > responses , int idx , DocWriteRequest <?> request ,
260
+ String index , Exception e ) {
260
261
if (index .equals (request .index ())) {
261
- responses .set (idx , new BulkItemResponse (idx , request .opType (), new BulkItemResponse .Failure (request .index (), request .type (), request .id (), e )));
262
+ responses .set (idx , new BulkItemResponse (idx , request .opType (), new BulkItemResponse .Failure (request .index (), request .type (),
263
+ request .id (), e )));
262
264
return true ;
263
265
}
264
266
return false ;
@@ -327,19 +329,22 @@ protected void doRun() throws Exception {
327
329
indexRequest .process (indexCreated , mappingMd , concreteIndex .getName ());
328
330
break ;
329
331
case UPDATE :
330
- TransportUpdateAction .resolveAndValidateRouting (metaData , concreteIndex .getName (), (UpdateRequest ) docWriteRequest );
332
+ TransportUpdateAction .resolveAndValidateRouting (metaData , concreteIndex .getName (),
333
+ (UpdateRequest ) docWriteRequest );
331
334
break ;
332
335
case DELETE :
333
336
docWriteRequest .routing (metaData .resolveWriteIndexRouting (docWriteRequest .routing (), docWriteRequest .index ()));
334
337
// check if routing is required, if so, throw error if routing wasn't specified
335
- if (docWriteRequest .routing () == null && metaData .routingRequired (concreteIndex .getName (), docWriteRequest .type ())) {
338
+ if (docWriteRequest .routing () == null && metaData .routingRequired (concreteIndex .getName (),
339
+ docWriteRequest .type ())) {
336
340
throw new RoutingMissingException (concreteIndex .getName (), docWriteRequest .type (), docWriteRequest .id ());
337
341
}
338
342
break ;
339
343
default : throw new AssertionError ("request type not supported: [" + docWriteRequest .opType () + "]" );
340
344
}
341
345
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e ) {
342
- BulkItemResponse .Failure failure = new BulkItemResponse .Failure (concreteIndex .getName (), docWriteRequest .type (), docWriteRequest .id (), e );
346
+ BulkItemResponse .Failure failure = new BulkItemResponse .Failure (concreteIndex .getName (), docWriteRequest .type (),
347
+ docWriteRequest .id (), e );
343
348
BulkItemResponse bulkItemResponse = new BulkItemResponse (i , docWriteRequest .opType (), failure );
344
349
responses .set (i , bulkItemResponse );
345
350
// make sure the request gets never processed again
@@ -355,13 +360,15 @@ protected void doRun() throws Exception {
355
360
continue ;
356
361
}
357
362
String concreteIndex = concreteIndices .getConcreteIndex (request .index ()).getName ();
358
- ShardId shardId = clusterService .operationRouting ().indexShards (clusterState , concreteIndex , request .id (), request .routing ()).shardId ();
363
+ ShardId shardId = clusterService .operationRouting ().indexShards (clusterState , concreteIndex , request .id (),
364
+ request .routing ()).shardId ();
359
365
List <BulkItemRequest > shardRequests = requestsByShard .computeIfAbsent (shardId , shard -> new ArrayList <>());
360
366
shardRequests .add (new BulkItemRequest (i , request ));
361
367
}
362
368
363
369
if (requestsByShard .isEmpty ()) {
364
- listener .onResponse (new BulkResponse (responses .toArray (new BulkItemResponse [responses .length ()]), buildTookInMillis (startTimeNanos )));
370
+ listener .onResponse (new BulkResponse (responses .toArray (new BulkItemResponse [responses .length ()]),
371
+ buildTookInMillis (startTimeNanos )));
365
372
return ;
366
373
}
367
374
@@ -407,7 +414,8 @@ public void onFailure(Exception e) {
407
414
}
408
415
409
416
private void finishHim () {
410
- listener .onResponse (new BulkResponse (responses .toArray (new BulkItemResponse [responses .length ()]), buildTookInMillis (startTimeNanos )));
417
+ listener .onResponse (new BulkResponse (responses .toArray (new BulkItemResponse [responses .length ()]),
418
+ buildTookInMillis (startTimeNanos )));
411
419
}
412
420
});
413
421
}
@@ -535,7 +543,8 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
535
543
} else {
536
544
long ingestTookInMillis = TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - ingestStartTimeInNanos );
537
545
BulkRequest bulkRequest = bulkRequestModifier .getBulkRequest ();
538
- ActionListener <BulkResponse > actionListener = bulkRequestModifier .wrapActionListenerIfNeeded (ingestTookInMillis , listener );
546
+ ActionListener <BulkResponse > actionListener = bulkRequestModifier .wrapActionListenerIfNeeded (ingestTookInMillis ,
547
+ listener );
539
548
if (bulkRequest .requests ().isEmpty ()) {
540
549
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
541
550
// so we stop and send an empty response back to the client.
@@ -628,7 +637,8 @@ void markCurrentItemAsFailed(Exception e) {
628
637
// 2) Add a bulk item failure for this request
629
638
// 3) Continue with the next request in the bulk.
630
639
failedSlots .set (currentSlot );
631
- BulkItemResponse .Failure failure = new BulkItemResponse .Failure (indexRequest .index (), indexRequest .type (), indexRequest .id (), e );
640
+ BulkItemResponse .Failure failure = new BulkItemResponse .Failure (indexRequest .index (), indexRequest .type (),
641
+ indexRequest .id (), e );
632
642
itemResponses .add (new BulkItemResponse (currentSlot , indexRequest .opType (), failure ));
633
643
}
634
644
@@ -641,7 +651,8 @@ static final class IngestBulkResponseListener implements ActionListener<BulkResp
641
651
private final List <BulkItemResponse > itemResponses ;
642
652
private final ActionListener <BulkResponse > actionListener ;
643
653
644
- IngestBulkResponseListener (long ingestTookInMillis , int [] originalSlots , List <BulkItemResponse > itemResponses , ActionListener <BulkResponse > actionListener ) {
654
+ IngestBulkResponseListener (long ingestTookInMillis , int [] originalSlots , List <BulkItemResponse > itemResponses ,
655
+ ActionListener <BulkResponse > actionListener ) {
645
656
this .ingestTookInMillis = ingestTookInMillis ;
646
657
this .itemResponses = itemResponses ;
647
658
this .actionListener = actionListener ;
0 commit comments