8
8
9
9
package org .elasticsearch .tasks ;
10
10
11
- import com .carrotsearch .hppc .ObjectIntHashMap ;
12
- import com .carrotsearch .hppc .ObjectIntMap ;
13
-
14
11
import org .apache .logging .log4j .LogManager ;
15
12
import org .apache .logging .log4j .Logger ;
16
13
import org .apache .logging .log4j .message .ParameterizedMessage ;
60
57
import java .util .concurrent .atomic .AtomicBoolean ;
61
58
import java .util .concurrent .atomic .AtomicLong ;
62
59
import java .util .function .Consumer ;
63
- import java .util .stream .Collectors ;
64
- import java .util .stream .StreamSupport ;
65
60
66
61
import static org .elasticsearch .core .TimeValue .timeValueMillis ;
67
62
import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_MAX_HEADER_SIZE ;
@@ -506,7 +501,7 @@ private static class CancellableTaskHolder {
506
501
private final CancellableTask task ;
507
502
private boolean finished = false ;
508
503
private List <Runnable > cancellationListeners = null ;
509
- private ObjectIntMap <Transport .Connection > childTasksPerConnection = null ;
504
+ private Map <Transport .Connection , Integer > childTasksPerConnection = null ;
510
505
private String banChildrenReason ;
511
506
private List <Runnable > childTaskCompletedListeners = null ;
512
507
@@ -587,15 +582,15 @@ synchronized void registerChildConnection(Transport.Connection connection) {
587
582
throw new TaskCancelledException ("parent task was cancelled [" + banChildrenReason + ']' );
588
583
}
589
584
if (childTasksPerConnection == null ) {
590
- childTasksPerConnection = new ObjectIntHashMap <>();
585
+ childTasksPerConnection = new HashMap <>();
591
586
}
592
- childTasksPerConnection .addTo (connection , 1 );
587
+ childTasksPerConnection .merge (connection , 1 , Integer :: sum );
593
588
}
594
589
595
590
void unregisterChildConnection (Transport .Connection node ) {
596
591
final List <Runnable > listeners ;
597
592
synchronized (this ) {
598
- if (childTasksPerConnection .addTo (node , -1 ) == 0 ) {
593
+ if (childTasksPerConnection .merge (node , -1 , Integer :: sum ) == 0 ) {
599
594
childTasksPerConnection .remove (node );
600
595
}
601
596
if (childTasksPerConnection .isEmpty () == false || this .childTaskCompletedListeners == null ) {
@@ -617,9 +612,7 @@ Set<Transport.Connection> startBan(String reason, Runnable onChildTasksCompleted
617
612
if (childTasksPerConnection == null ) {
618
613
pendingChildConnections = Collections .emptySet ();
619
614
} else {
620
- pendingChildConnections = StreamSupport .stream (childTasksPerConnection .spliterator (), false )
621
- .map (e -> e .key )
622
- .collect (Collectors .toUnmodifiableSet ());
615
+ pendingChildConnections = Set .copyOf (childTasksPerConnection .keySet ());
623
616
}
624
617
if (pendingChildConnections .isEmpty ()) {
625
618
assert childTaskCompletedListeners == null ;
0 commit comments