Skip to content

Only accept transport requests after node is fully initialized #16746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
Expand Down Expand Up @@ -155,7 +154,10 @@ protected void configure() {
pluginsService.processModules(modules);

Injector injector = modules.createInjector();
injector.getInstance(TransportService.class).start();
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();

TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/elasticsearch/discovery/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public FailedToCommitClusterStateException(String msg, Throwable cause, Object..
DiscoveryStats stats();


/**
* Triggers the first join cycle
*/
void startInitialJoin();


/***
* @return the current value of minimum master nodes, or -1 for not set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ protected void doStart() {
logger.info(discovery.nodeDescription());
}

public void waitForInitialState() {
public void joinClusterAndWaitForInitialState() {
try {
discovery.startInitialJoin();
if (!initialStateListener.waitForInitialState(initialStateTimeout)) {
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public void setRoutingService(RoutingService routingService) {

@Override
protected void doStart() {

}

@Override
public void startInitialJoin() {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ protected void doStart() {
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
}

@Override
public void startInitialJoin() {
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
clusterService.submitStateUpdateTask("initial_join", new ClusterStateUpdateTask() {

Expand Down
26 changes: 2 additions & 24 deletions core/src/main/java/org/elasticsearch/gateway/GatewayService.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,6 @@ public GatewayService(Settings settings, AllocationService allocationService, Cl
@Override
protected void doStart() {
clusterService.addLast(this);
// check we didn't miss any cluster state that came in until now / during the addition
clusterService.submitStateUpdateTask("gateway_initial_state_recovery", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
checkStateMeetsSettingsAndMaybeRecover(currentState);
return currentState;
}

@Override
public boolean runOnlyOnMaster() {
// It's OK to run on non masters as checkStateMeetsSettingsAndMaybeRecover checks for this
// we return false to avoid unneeded failure logs
return false;
}

@Override
public void onFailure(String source, Throwable t) {
logger.warn("unexpected failure while checking if state can be recovered. another attempt will be made with the next cluster state change", t);
}
});
}

@Override
Expand All @@ -170,10 +149,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
if (lifecycle.stoppedOrClosed()) {
return;
}
checkStateMeetsSettingsAndMaybeRecover(event.state());
}

protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state) {
final ClusterState state = event.state();

if (state.nodes().localNodeMaster() == false) {
// not our job to recover
return;
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
Expand All @@ -50,7 +49,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -279,8 +277,6 @@ public Node start() {
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(MonitorService.class).start();
Expand All @@ -289,16 +285,24 @@ public Node start() {
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(injector.getInstance(ClusterService.class), injector.getInstance(RoutingService.class));

injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();

// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
injector.getInstance(ClusterService.class).start();

// start after cluster service so the local disco is known
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();

// gateway should start after disco, so it can try and recovery from gateway on "start"
injector.getInstance(GatewayService.class).start();

transportService.acceptIncomingRequests();
discoService.joinClusterAndWaitForInitialState();

if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).start();
}
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(TribeService.class).start();

if (WRITE_PORTS_FIELD_SETTING.get(settings)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -71,7 +71,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic

public static final String DIRECT_RESPONSE_PROFILE = ".direct";

private final AtomicBoolean started = new AtomicBoolean(false);
private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a simple CountDownLatch that way we know it's only going in one direction (never move back to blocking), we get the wait for free and don't need to suppress forbidden API?

protected final Transport transport;
protected final ThreadPool threadPool;
protected final TaskManager taskManager;
Expand Down Expand Up @@ -167,6 +167,7 @@ void setTracerLogInclude(List<String> tracerLogInclude) {
void setTracerLogExclude(List<String> tracelLogExclude) {
this.tracelLogExclude = tracelLogExclude.toArray(Strings.EMPTY_ARRAY);
}

@Override
protected void doStart() {
adapter.rxMetric.clear();
Expand All @@ -179,14 +180,10 @@ protected void doStart() {
logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
}
}
boolean setStarted = started.compareAndSet(false, true);
assert setStarted : "service was already started";
}

@Override
protected void doStop() {
final boolean setStopped = started.compareAndSet(true, false);
assert setStopped : "service has already been stopped";
try {
transport.stop();
} finally {
Expand All @@ -213,6 +210,15 @@ protected void doClose() {
transport.close();
}

/**
* start accepting incoming requests.
* when the transport layer starts up it will block any incoming requests until
* this method is called
*/
public void acceptIncomingRequests() {
blockIncomingRequestsLatch.countDown();
}

public boolean addressSupported(Class<? extends TransportAddress> address) {
return transport.addressSupported(address);
}
Expand Down Expand Up @@ -302,7 +308,7 @@ public <T extends TransportResponse> void sendRequest(final DiscoveryNode node,
timeoutHandler = new TimeoutHandler(requestId);
}
clientHandlers.put(requestId, new RequestHolder<>(new ContextRestoreResponseHandler<T>(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler));
if (started.get() == false) {
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
// it will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
Expand Down Expand Up @@ -405,10 +411,11 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi

/**
* Registers a new request handler
* @param action The action the request handler is associated with
*
* @param action The action the request handler is associated with
* @param requestFactory a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
Expand All @@ -417,11 +424,12 @@ public <Request extends TransportRequest> void registerRequestHandler(String act

/**
* Registers a new request handler
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param handler The handler itself that implements the request handling
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
Expand Down Expand Up @@ -494,6 +502,11 @@ protected void traceResponseSent(long requestId, String action, Throwable t) {

@Override
public void onRequestReceived(long requestId, String action) {
try {
blockIncomingRequestsLatch.await();
} catch (InterruptedException e) {
logger.trace("interrupted while waiting for incoming requests block to be removed");
}
if (traceEnabled() && shouldTraceAction(action)) {
traceReceivedRequest(requestId, action);
}
Expand Down Expand Up @@ -729,6 +742,7 @@ public void cancelTimeout() {
private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final TransportResponseHandler<T> delegate;
private final ThreadContext.StoredContext threadContext;

private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) {
this.delegate = delegate;
this.threadContext = threadContext;
Expand Down Expand Up @@ -766,7 +780,7 @@ static class DirectResponseChannel implements TransportChannel {
final ThreadPool threadPool;

public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId,
TransportServiceAdapter adapter, ThreadPool threadPool) {
TransportServiceAdapter adapter, ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ protected TaskManager createTaskManager() {
actionFilters, indexNameExpressionResolver);
transportCancelTasksAction = new TransportCancelTasksAction(settings, clusterName, threadPool, clusterService, transportService,
actionFilters, indexNameExpressionResolver);
transportService.acceptIncomingRequests();
}

public final TestClusterService clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
setClusterState(clusterService, TEST_INDEX);
action = new TestTransportBroadcastByNodeAction(
Settings.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
localNode = new DiscoveryNode("local_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
remoteNode = new DiscoveryNode("remote_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
allNodes = new DiscoveryNode[] { localNode, remoteNode };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.elasticsearch.action.support.replication;

import com.carrotsearch.randomizedtesting.annotations.Repeat;

import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ReplicationResponse;
Expand Down Expand Up @@ -126,6 +124,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool);
count.set(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(
Settings.EMPTY,
"indices:admin/test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected TestResponse newResponse() {
};
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);

nodesCount = randomIntBetween(1, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void setUp() throws Exception {
clusterService = new TestClusterService(THREAD_POOL);
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {});
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protected MockTransportService build(Settings settings, Version version) {
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ public void testSimplePings() throws InterruptedException {

NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
transportServiceA.acceptIncomingRequests();
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);

InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();

NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
transportServiceB.acceptIncomingRequests();
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT);

InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();
Expand Down
Loading