133
133
import org .elasticsearch .transport .Transport ;
134
134
import org .elasticsearch .transport .TransportInterceptor ;
135
135
import org .elasticsearch .transport .TransportService ;
136
- import org .elasticsearch .tribe .TribeService ;
137
136
import org .elasticsearch .usage .UsageService ;
138
137
import org .elasticsearch .watcher .ResourceWatcherService ;
139
138
153
152
import java .util .Collections ;
154
153
import java .util .List ;
155
154
import java .util .Map ;
155
+ import java .util .concurrent .CopyOnWriteArrayList ;
156
156
import java .util .concurrent .CountDownLatch ;
157
157
import java .util .concurrent .TimeUnit ;
158
158
import java .util .function .Consumer ;
@@ -229,6 +229,7 @@ public static final Settings addNodeNameIfNeeded(Settings settings, final String
229
229
private final Collection <LifecycleComponent > pluginLifecycleComponents ;
230
230
private final LocalNodeFactory localNodeFactory ;
231
231
private final NodeService nodeService ;
232
+ private final List <Runnable > onStartedListeners = new CopyOnWriteArrayList <>();
232
233
233
234
/**
234
235
* Constructs a node with the given settings.
@@ -256,8 +257,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
256
257
Settings tmpSettings = Settings .builder ().put (environment .settings ())
257
258
.put (Client .CLIENT_TYPE_SETTING_S .getKey (), CLIENT_TYPE ).build ();
258
259
259
- tmpSettings = TribeService .processSettings (tmpSettings );
260
-
261
260
// create the node environment as soon as possible, to recover the node id and enable logging
262
261
try {
263
262
nodeEnvironment = new NodeEnvironment (tmpSettings , environment );
@@ -385,15 +384,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
385
384
.flatMap (p -> p .getNamedXContent ().stream ()),
386
385
ClusterModule .getNamedXWriteables ().stream ())
387
386
.flatMap (Function .identity ()).collect (toList ()));
388
- final TribeService tribeService =
389
- new TribeService (
390
- settings ,
391
- environment .configFile (),
392
- clusterService ,
393
- nodeId ,
394
- namedWriteableRegistry ,
395
- (s , p ) -> newTribeClientNode (s , classpathPlugins , p ));
396
- resourcesToClose .add (tribeService );
397
387
modules .add (new RepositoriesModule (this .environment , pluginsService .filterPlugins (RepositoryPlugin .class ), xContentRegistry ));
398
388
final MetaStateService metaStateService = new MetaStateService (settings , nodeEnvironment , xContentRegistry );
399
389
final IndicesService indicesService = new IndicesService (settings , pluginsService , nodeEnvironment , xContentRegistry ,
@@ -449,6 +439,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
449
439
transportService , indicesService , pluginsService , circuitBreakerService , scriptModule .getScriptService (),
450
440
httpServerTransport , ingestService , clusterService , settingsModule .getSettingsFilter ());
451
441
modules .add (b -> {
442
+ b .bind (NodeBuilder .class ).toInstance (new NodeBuilder (this , classpathPlugins ));
452
443
b .bind (Node .class ).toInstance (this );
453
444
b .bind (NodeService .class ).toInstance (nodeService );
454
445
b .bind (NamedXContentRegistry .class ).toInstance (xContentRegistry );
@@ -458,7 +449,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
458
449
b .bind (Environment .class ).toInstance (this .environment );
459
450
b .bind (ThreadPool .class ).toInstance (threadPool );
460
451
b .bind (NodeEnvironment .class ).toInstance (nodeEnvironment );
461
- b .bind (TribeService .class ).toInstance (tribeService );
462
452
b .bind (ResourceWatcherService .class ).toInstance (resourceWatcherService );
463
453
b .bind (CircuitBreakerService .class ).toInstance (circuitBreakerService );
464
454
b .bind (BigArrays .class ).toInstance (bigArrays );
@@ -527,6 +517,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
527
517
}
528
518
}
529
519
520
+ public void addOnStartedListener (Runnable runnable ) {
521
+ onStartedListeners .add (runnable );
522
+ }
523
+
530
524
// visible for testing
531
525
static void warnIfPreRelease (final Version version , final boolean isSnapshot , final Logger logger ) {
532
526
if (!version .isRelease () || isSnapshot ) {
@@ -612,10 +606,6 @@ public Node start() throws NodeValidationException {
612
606
Discovery discovery = injector .getInstance (Discovery .class );
613
607
clusterService .getMasterService ().setClusterStatePublisher (discovery ::publish );
614
608
615
- // start before the cluster service since it adds/removes initial Cluster state blocks
616
- final TribeService tribeService = injector .getInstance (TribeService .class );
617
- tribeService .start ();
618
-
619
609
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
620
610
TransportService transportService = injector .getInstance (TransportService .class );
621
611
transportService .getTaskManager ().setTaskResultsService (injector .getInstance (TaskResultsService .class ));
@@ -682,10 +672,10 @@ public void onTimeout(TimeValue timeout) {
682
672
writePortsFile ("transport" , transport .boundAddress ());
683
673
}
684
674
685
- // start nodes now, after the http server, because it may take some time
686
- tribeService .startNodes ();
687
675
logger .info ("started" );
688
676
677
+ onStartedListeners .forEach (Runnable ::run );
678
+
689
679
return this ;
690
680
}
691
681
@@ -696,7 +686,6 @@ private Node stop() {
696
686
Logger logger = Loggers .getLogger (Node .class , NODE_NAME_SETTING .get (settings ));
697
687
logger .info ("stopping ..." );
698
688
699
- injector .getInstance (TribeService .class ).stop ();
700
689
injector .getInstance (ResourceWatcherService .class ).stop ();
701
690
if (NetworkModule .HTTP_ENABLED .get (settings )) {
702
691
injector .getInstance (HttpServerTransport .class ).stop ();
@@ -744,7 +733,6 @@ public synchronized void close() throws IOException {
744
733
List <Closeable > toClose = new ArrayList <>();
745
734
StopWatch stopWatch = new StopWatch ("node_close" );
746
735
toClose .add (() -> stopWatch .start ("tribe" ));
747
- toClose .add (injector .getInstance (TribeService .class ));
748
736
toClose .add (() -> stopWatch .stop ().start ("node_service" ));
749
737
toClose .add (nodeService );
750
738
toClose .add (() -> stopWatch .stop ().start ("http" ));
@@ -920,8 +908,23 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
920
908
return customNameResolvers ;
921
909
}
922
910
923
- /** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
924
- protected Node newTribeClientNode (Settings settings , Collection <Class <? extends Plugin >> classpathPlugins , Path configPath ) {
911
+ public static class NodeBuilder {
912
+
913
+ private final Node node ;
914
+ private final Collection <Class <? extends Plugin >> classpathPlugins ;
915
+
916
+ public NodeBuilder (Node node , Collection <Class <? extends Plugin >> classpathPlugins ) {
917
+ this .node = node ;
918
+ this .classpathPlugins = classpathPlugins ;
919
+ }
920
+
921
+ public Node newNode (Settings settings , Path configPath ) {
922
+ return node .newNode (settings , classpathPlugins , configPath );
923
+ }
924
+ }
925
+
926
+ /** Constructs a new node based on the following settings. Overridden by tests */
927
+ protected Node newNode (Settings settings , Collection <Class <? extends Plugin >> classpathPlugins , Path configPath ) {
925
928
return new Node (new Environment (settings , configPath ), classpathPlugins );
926
929
}
927
930
0 commit comments