22
22
import org .elasticsearch .ElasticsearchTimeoutException ;
23
23
import org .elasticsearch .action .ActionListener ;
24
24
import org .elasticsearch .action .support .ActionFilters ;
25
+ import org .elasticsearch .action .support .HandledTransportAction ;
25
26
import org .elasticsearch .action .support .TransportAction ;
26
27
import org .elasticsearch .cluster .coordination .Coordinator ;
27
28
import org .elasticsearch .cluster .node .DiscoveryNode ;
28
29
import org .elasticsearch .common .Nullable ;
29
30
import org .elasticsearch .common .inject .Inject ;
31
+ import org .elasticsearch .common .io .stream .Writeable .Reader ;
30
32
import org .elasticsearch .common .lease .Releasable ;
31
33
import org .elasticsearch .common .settings .Settings ;
32
34
import org .elasticsearch .common .util .concurrent .ListenableFuture ;
40
42
import java .util .Set ;
41
43
import java .util .concurrent .CompletableFuture ;
42
44
import java .util .concurrent .ExecutionException ;
45
+ import java .util .concurrent .atomic .AtomicBoolean ;
43
46
import java .util .function .Consumer ;
44
47
45
- public class TransportGetDiscoveredNodesAction extends TransportAction <GetDiscoveredNodesRequest , GetDiscoveredNodesResponse > {
48
+ public class TransportGetDiscoveredNodesAction extends HandledTransportAction <GetDiscoveredNodesRequest , GetDiscoveredNodesResponse > {
46
49
47
50
@ Nullable // TODO make this not nullable
48
51
private final Coordinator coordinator ;
@@ -51,7 +54,9 @@ public class TransportGetDiscoveredNodesAction extends TransportAction<GetDiscov
51
54
@ Inject
52
55
public TransportGetDiscoveredNodesAction (Settings settings , ActionFilters actionFilters , TransportService transportService ,
53
56
Discovery discovery ) {
54
- super (settings , GetDiscoveredNodesAction .NAME , actionFilters , transportService .getTaskManager ());
57
+ super (settings , GetDiscoveredNodesAction .NAME , transportService , actionFilters ,
58
+ (Reader <GetDiscoveredNodesRequest >) GetDiscoveredNodesRequest ::new );
59
+
55
60
this .transportService = transportService ;
56
61
if (discovery instanceof Coordinator ) {
57
62
coordinator = (Coordinator ) discovery ;
@@ -71,6 +76,7 @@ protected void doExecute(Task task, GetDiscoveredNodesRequest request, ActionLis
71
76
if (localNode .isMasterNode () == false ) {
72
77
throw new ElasticsearchException ("this node is not master-eligible" );
73
78
}
79
+ final AtomicBoolean listenerNotified = new AtomicBoolean ();
74
80
final ListenableFuture <GetDiscoveredNodesResponse > listenableFuture = new ListenableFuture <>();
75
81
final ThreadPool threadPool = transportService .getThreadPool ();
76
82
listenableFuture .addListener (listener ,
@@ -84,7 +90,7 @@ public void accept(Iterable<DiscoveryNode> nodes) {
84
90
nodesSet .add (localNode );
85
91
nodes .forEach (nodesSet ::add );
86
92
logger .trace ("discovered {}" , nodesSet );
87
- if (nodesSet .size () >= request .getMinimumNodeCount ()) {
93
+ if (nodesSet .size () >= request .getMinimumNodeCount () && listenerNotified . compareAndSet ( false , true ) ) {
88
94
listenableFuture .onResponse (new GetDiscoveredNodesResponse (nodesSet ));
89
95
}
90
96
}
@@ -101,7 +107,9 @@ public String toString() {
101
107
threadPool .schedule (request .getTimeout (), Names .GENERIC , new Runnable () {
102
108
@ Override
103
109
public void run () {
104
- listenableFuture .onFailure (new ElasticsearchTimeoutException ("timed out while waiting for " + request ));
110
+ if (listenerNotified .compareAndSet (false , true )) {
111
+ listenableFuture .onFailure (new ElasticsearchTimeoutException ("timed out while waiting for " + request ));
112
+ }
105
113
}
106
114
107
115
@ Override
0 commit comments