23
23
import org .elasticsearch .action .OriginalIndices ;
24
24
import org .elasticsearch .action .support .ActionFilters ;
25
25
import org .elasticsearch .action .support .HandledTransportAction ;
26
+ import org .elasticsearch .client .Client ;
26
27
import org .elasticsearch .cluster .ClusterState ;
27
28
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
28
29
import org .elasticsearch .cluster .service .ClusterService ;
33
34
import org .elasticsearch .threadpool .ThreadPool ;
34
35
import org .elasticsearch .transport .RemoteClusterAware ;
35
36
import org .elasticsearch .transport .RemoteClusterService ;
36
- import org .elasticsearch .transport .Transport ;
37
- import org .elasticsearch .transport .TransportException ;
38
- import org .elasticsearch .transport .TransportRequestOptions ;
39
- import org .elasticsearch .transport .TransportResponseHandler ;
40
37
import org .elasticsearch .transport .TransportService ;
41
38
42
39
import java .util .ArrayList ;
@@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
49
46
private final ClusterService clusterService ;
50
47
private final TransportFieldCapabilitiesIndexAction shardAction ;
51
48
private final RemoteClusterService remoteClusterService ;
52
- private final TransportService transportService ;
53
49
54
50
@ Inject
55
51
public TransportFieldCapabilitiesAction (Settings settings , TransportService transportService ,
@@ -62,7 +58,6 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
62
58
actionFilters , indexNameExpressionResolver , FieldCapabilitiesRequest ::new );
63
59
this .clusterService = clusterService ;
64
60
this .remoteClusterService = transportService .getRemoteClusterService ();
65
- this .transportService = transportService ;
66
61
this .shardAction = shardAction ;
67
62
}
68
63
@@ -118,47 +113,20 @@ public void onFailure(Exception e) {
118
113
for (Map .Entry <String , OriginalIndices > remoteIndices : remoteClusterIndices .entrySet ()) {
119
114
String clusterAlias = remoteIndices .getKey ();
120
115
OriginalIndices originalIndices = remoteIndices .getValue ();
121
- // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
122
- remoteClusterService .ensureConnected (clusterAlias , ActionListener .wrap (v -> {
123
- Transport .Connection connection = remoteClusterService .getConnection (clusterAlias );
124
- FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest ();
125
- remoteRequest .setMergeResults (false ); // we need to merge on this node
126
- remoteRequest .indicesOptions (originalIndices .indicesOptions ());
127
- remoteRequest .indices (originalIndices .indices ());
128
- remoteRequest .fields (request .fields ());
129
- transportService .sendRequest (connection , FieldCapabilitiesAction .NAME , remoteRequest , TransportRequestOptions .EMPTY ,
130
- new TransportResponseHandler <FieldCapabilitiesResponse >() {
131
-
132
- @ Override
133
- public FieldCapabilitiesResponse newInstance () {
134
- return new FieldCapabilitiesResponse ();
135
- }
136
-
137
- @ Override
138
- public void handleResponse (FieldCapabilitiesResponse response ) {
139
- try {
140
- for (FieldCapabilitiesIndexResponse res : response .getIndexResponses ()) {
141
- indexResponses .add (new FieldCapabilitiesIndexResponse (RemoteClusterAware .
142
- buildRemoteIndexName (clusterAlias , res .getIndexName ()), res .get ()));
143
- }
144
- } finally {
145
- onResponse .run ();
146
- }
147
- }
148
-
149
- @ Override
150
- public void handleException (TransportException exp ) {
151
- onResponse .run ();
152
- }
153
-
154
- @ Override
155
- public String executor () {
156
- return ThreadPool .Names .SAME ;
157
- }
158
- });
159
- }, e -> onResponse .run ()));
116
+ Client remoteClusterClient = remoteClusterService .getRemoteClusterClient (threadPool , clusterAlias );
117
+ FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest ();
118
+ remoteRequest .setMergeResults (false ); // we need to merge on this node
119
+ remoteRequest .indicesOptions (originalIndices .indicesOptions ());
120
+ remoteRequest .indices (originalIndices .indices ());
121
+ remoteRequest .fields (request .fields ());
122
+ remoteClusterClient .fieldCaps (remoteRequest , ActionListener .wrap (response -> {
123
+ for (FieldCapabilitiesIndexResponse res : response .getIndexResponses ()) {
124
+ indexResponses .add (new FieldCapabilitiesIndexResponse (RemoteClusterAware .
125
+ buildRemoteIndexName (clusterAlias , res .getIndexName ()), res .get ()));
126
+ }
127
+ onResponse .run ();
128
+ }, failure -> onResponse .run ()));
160
129
}
161
-
162
130
}
163
131
}
164
132
0 commit comments