diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index fae4082e81828..0f3185add0833 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -102,7 +103,7 @@ public void testScheduledPing() throws Exception { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/action/Action.java b/server/src/main/java/org/elasticsearch/action/Action.java index 771762ad15c30..f0df6202072a4 100644 --- a/server/src/main/java/org/elasticsearch/action/Action.java +++ b/server/src/main/java/org/elasticsearch/action/Action.java @@ -19,6 +19,7 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.transport.TransportRequestOptions; @@ -45,9 +46,23 @@ public String name() { /** * Creates a new response instance. + * @deprecated Implement {@link #getResponseReader()} instead and make this method throw an + * {@link UnsupportedOperationException} */ + @Deprecated public abstract Response newResponse(); + /** + * Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput} + */ + public Writeable.Reader getResponseReader() { + return in -> { + Response response = newResponse(); + response.readFrom(in); + return response; + }; + } + /** * Optional request options for the action. */ diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index f258be3a16137..432cef6ad3029 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -19,13 +19,15 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import java.io.IOException; import java.util.Objects; -import java.util.function.Supplier; /** * A simple base class for action response listeners, defaulting to using the SAME executor (as its @@ -34,11 +36,11 @@ public class ActionListenerResponseHandler implements TransportResponseHandler { private final ActionListener listener; - private final Supplier responseSupplier; + private final Writeable.Reader reader; - public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { + public ActionListenerResponseHandler(ActionListener listener, Writeable.Reader reader) { this.listener = Objects.requireNonNull(listener); - this.responseSupplier = Objects.requireNonNull(responseSupplier); + this.reader = Objects.requireNonNull(reader); } @Override @@ -52,12 +54,12 @@ public void handleException(TransportException e) { } @Override - public Response newInstance() { - return responseSupplier.get(); + public String executor() { + return ThreadPool.Names.SAME; } @Override - public String executor() { - return ThreadPool.Names.SAME; + public Response read(StreamInput in) throws IOException { + return reader.read(in); } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionResponse.java b/server/src/main/java/org/elasticsearch/action/ActionResponse.java index a1cd3068a269f..dd019ba3f5591 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ActionResponse.java @@ -30,6 +30,13 @@ */ public abstract class ActionResponse extends TransportResponse { + public ActionResponse() { + } + + public ActionResponse(StreamInput in) throws IOException { + super(in); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index c369deb0b10b3..7d8dbd1f975bd 100644 --- a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -48,6 +48,6 @@ public void execute(final DiscoveryNode node, final Request request, final Actio return; } transportService.sendRequest(node, action.name(), request, transportOptions, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 927d2e47680c5..69fc7ee376c0b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -119,8 +120,10 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public GetTaskResponse newInstance() { - return new GetTaskResponse(); + public GetTaskResponse read(StreamInput in) throws IOException { + GetTaskResponse response = new GetTaskResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java index ec936c623a24a..869aecf095431 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.shards; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; public class ClusterSearchShardsAction extends Action { @@ -32,6 +33,11 @@ private ClusterSearchShardsAction() { @Override public ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return ClusterSearchShardsResponse::new; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index f8d448d0fe11c..57407bd61fb82 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -38,36 +38,12 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], new DiscoveryNode[0], Collections.emptyMap()); - private ClusterSearchShardsGroup[] groups; - private DiscoveryNode[] nodes; - private Map indicesAndFilters; + private final ClusterSearchShardsGroup[] groups; + private final DiscoveryNode[] nodes; + private final Map indicesAndFilters; - public ClusterSearchShardsResponse() { - - } - - public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, - Map indicesAndFilters) { - this.groups = groups; - this.nodes = nodes; - this.indicesAndFilters = indicesAndFilters; - } - - public ClusterSearchShardsGroup[] getGroups() { - return groups; - } - - public DiscoveryNode[] getNodes() { - return nodes; - } - - public Map getIndicesAndFilters() { - return indicesAndFilters; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public ClusterSearchShardsResponse(StreamInput in) throws IOException { + super(in); groups = new ClusterSearchShardsGroup[in.readVInt()]; for (int i = 0; i < groups.length; i++) { groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in); @@ -85,6 +61,11 @@ public void readFrom(StreamInput in) throws IOException { } } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -103,6 +84,25 @@ public void writeTo(StreamOutput out) throws IOException { } } + public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, + Map indicesAndFilters) { + this.groups = groups; + this.nodes = nodes; + this.indicesAndFilters = indicesAndFilters; + } + + public ClusterSearchShardsGroup[] getGroups() { + return groups; + } + + public DiscoveryNode[] getNodes() { + return nodes; + } + + public Map getIndicesAndFilters() { + return indicesAndFilters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 9774ecdffba17..f4f36ca4d65e9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -39,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -72,7 +74,12 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C @Override protected ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java index 6f5147c38bdbb..ae5a736bde66d 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java @@ -49,7 +49,7 @@ public IngestActionForwarder(TransportService transportService) { public void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) { transportService.sendRequest(randomIngestNode(), action.name(), request, - new ActionListenerResponseHandler(listener, action::newResponse)); + new ActionListenerResponseHandler(listener, action.getResponseReader())); } private DiscoveryNode randomIngestNode() { diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 6d0c35345b1fa..50d75b20dc82b 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Consumer; import java.util.function.Supplier; @@ -151,8 +153,10 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima transportOptions, new TransportResponseHandler() { @Override - public ResyncReplicationResponse newInstance() { - return newResponseInstance(); + public ResyncReplicationResponse read(StreamInput in) throws IOException { + ResyncReplicationResponse response = newResponseInstance(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index cb30385ecc868..f2b1b0d5c6265 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -135,6 +135,10 @@ public Exception getFailure() { MultiSearchResponse() { } + MultiSearchResponse(StreamInput in) throws IOException { + readFrom(in); + } + public MultiSearchResponse(Item[] items, long tookInMillis) { this.items = items; this.tookInMillis = tookInMillis; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index fd43a948ee5fb..302ed4ccbfec9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; @@ -60,7 +61,6 @@ import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -119,7 +119,7 @@ public void sendCanMatch(Transport.Connection connection, final ShardSearchTrans public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -133,11 +133,11 @@ public void sendExecuteQuery(Transport.Connection connection, final ShardSearchT // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // this used to be the QUERY_AND_FETCH which doesn't exist anymore. final boolean fetchDocuments = request.numberOfShards() == 1; - Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, @@ -155,8 +155,8 @@ public void sendExecuteScrollQuery(Transport.Connection connection, final Intern public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, - clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, clientConnections, + connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -279,6 +279,10 @@ public static class SearchFreeContextResponse extends TransportResponse { SearchFreeContextResponse() { } + SearchFreeContextResponse(StreamInput in) throws IOException { + freed = in.readBoolean(); + } + SearchFreeContextResponse(boolean freed) { this.freed = freed; } @@ -306,22 +310,20 @@ public static void registerRequestHandler(TransportService transportService, Sea boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new, (request, channel, task) -> { boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME, (request, channel, task) -> { searchService.freeAllScrollContexts(); channel.sendResponse(TransportResponse.Empty.INSTANCE); }); TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { @@ -352,8 +354,8 @@ public void onFailure(Exception e) { searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( channel, QUERY_ACTION_NAME, request)); }); - TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, - (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); + TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, + (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { @@ -395,8 +397,7 @@ public void onFailure(Exception e) { (request, channel, task) -> { searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request)); }); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, - (Supplier) SearchService.CanMatchResponse::new); + TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); } @@ -419,9 +420,9 @@ final class ConnectionCountingHandler extend private final Map clientConnections; private final String nodeId; - ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + ConnectionCountingHandler(final ActionListener listener, final Writeable.Reader responseReader, final Map clientConnections, final String nodeId) { - super(listener, responseSupplier); + super(listener, responseReader); this.clientConnections = clientConnections; this.nodeId = nodeId; // Increment the number of connections for this node by one diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 22c4a70b0ea55..27dcb11da3869 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; @@ -173,8 +174,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting } else { transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler() { @Override - public ShardResponse newInstance() { - return newShardResponse(); + public ShardResponse read(StreamInput in) throws IOException { + ShardResponse response = newShardResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 9079238b7b62e..f097539626458 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -313,8 +313,10 @@ private void sendNodeRequest(final DiscoveryNode node, List shards } transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return new NodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = new NodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index a9ed05ac0377f..10780a55c27bf 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -35,6 +35,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,6 +49,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Predicate; import java.util.function.Supplier; @@ -101,8 +104,21 @@ protected TransportMasterNodeAction(Settings settings, String actionName, boolea protected abstract String executor(); + /** + * @deprecated new implementors should override {@link #read(StreamInput)} and use the + * {@link Writeable.Reader} interface. + * @return a new response instance. Typically this is used for serialization using the + * {@link Streamable#readFrom(StreamInput)} method. + */ + @Deprecated protected abstract Response newResponse(); + protected Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; + } + protected abstract void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception; /** @@ -201,21 +217,21 @@ protected void doRun() throws Exception { } else { DiscoveryNode masterNode = nodes.getMasterNode(); final String actionName = getMasterActionName(masterNode); - transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, - TransportMasterNodeAction.this::newResponse) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request with action name [{}] to " + - "master node [{}], scheduling a retry. Error: [{}]", - actionName, nodes.getMasterNode(), exp.getDetailedMessage()); - retry(cause, masterChangePredicate); - } else { - listener.onFailure(exp); + transportService.sendRequest(masterNode, actionName, request, + new ActionListenerResponseHandler(listener, TransportMasterNodeAction.this::read) { + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to " + + "master node [{}], scheduling a retry. Error: [{}]", + actionName, nodes.getMasterNode(), exp.getDetailedMessage()); + retry(cause, masterChangePredicate); + } else { + listener.onFailure(exp); + } } - } }); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 2be4e5bf053cc..317792c610479 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -39,6 +40,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -186,8 +188,10 @@ void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return newNodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = newNodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 695c9162633f6..820ab0300d69a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -317,12 +318,17 @@ public void onResponse(PrimaryShardReference primaryShardReference) { // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. final ShardRouting primary = primaryShardReference.routingEntry(); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; + final Writeable.Reader reader = in -> { + Response response = TransportReplicationAction.this.newResponseInstance(); + response.readFrom(in); + return response; + }; DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - TransportReplicationAction.this::newResponseInstance) { + reader) { @Override public void handleResponse(Response response) { @@ -577,7 +583,7 @@ public void onNewClusterState(ClusterState state) { String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), @@ -813,8 +819,10 @@ private void performAction(final DiscoveryNode node, final String action, final transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponseInstance(); + public Response read(StreamInput in) throws IOException { + Response response = newResponseInstance(); + response.readFrom(in); + return response; } @Override @@ -1186,7 +1194,11 @@ protected void sendReplicaRequest( final ConcreteReplicaRequest replicaRequest, final DiscoveryNode node, final ActionListener listener) { - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, ReplicaResponse::new); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { + ReplicaResponse replicaResponse = new ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + }); transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index e8e710aa81f2c..3a5d8d0e382e9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.NodeClosedException; @@ -47,6 +48,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Supplier; public abstract class TransportInstanceSingleOperationAction, Response extends ActionResponse> @@ -178,8 +180,10 @@ protected void doStart(ClusterState clusterState) { transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 436089ab3be73..0a50413e96964 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -182,8 +183,10 @@ public void start() { // just execute it on the local node transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override @@ -246,8 +249,10 @@ private void perform(@Nullable final Exception currentFailure) { transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 38a0d96600ce8..dad2bb8ad0896 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -270,8 +270,10 @@ private void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeTasksResponse newInstance() { - return new NodeTasksResponse(); + public NodeTasksResponse read(StreamInput in) throws IOException { + NodeTasksResponse response = new NodeTasksResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index aa0672d80ba1d..0cfc1f5004ce8 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -511,8 +511,10 @@ protected void doRun() throws Exception { new TransportResponseHandler() { @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + final ClusterStateResponse clusterStateResponse = new ClusterStateResponse(); + clusterStateResponse.readFrom(in); + return clusterStateResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 5acf2effad390..b48ea77e64c75 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -225,8 +225,8 @@ public void run() { transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new TransportResponseHandler() { @Override - public MasterPingResponseResponse newInstance() { - return new MasterPingResponseResponse(); + public MasterPingResponseResponse read(StreamInput in) throws IOException { + return new MasterPingResponseResponse(in); } @Override @@ -433,14 +433,8 @@ private static class MasterPingResponseResponse extends TransportResponse { private MasterPingResponseResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private MasterPingResponseResponse(StreamInput in) throws IOException { + super(in); } } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 57e5cab020be1..40bde9ee81d15 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -226,8 +226,8 @@ public void run() { .withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler() { @Override - public PingResponse newInstance() { - return new PingResponse(); + public PingResponse read(StreamInput in) throws IOException { + return new PingResponse(in); } @Override @@ -359,14 +359,8 @@ private static class PingResponse extends TransportResponse { private PingResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private PingResponse(StreamInput in) throws IOException { + super(in); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java index 7bc2e38dde024..5630ceea72945 100644 --- a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java +++ b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java @@ -84,8 +84,10 @@ public void allocateDangled(Collection indices, final Listener li AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()])); transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler() { @Override - public AllocateDangledResponse newInstance() { - return new AllocateDangledResponse(); + public AllocateDangledResponse read(StreamInput in) throws IOException { + final AllocateDangledResponse response = new AllocateDangledResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index fb7885a217e01..aeb88021f26e1 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -313,8 +313,10 @@ protected void getInflightOpsCount(final ShardId shardId, ClusterState state, In transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId), new TransportResponseHandler() { @Override - public InFlightOpsResponse newInstance() { - return new InFlightOpsResponse(); + public InFlightOpsResponse read(StreamInput in) throws IOException { + InFlightOpsResponse response = new InFlightOpsResponse(); + response.readFrom(in); + return response; } @Override @@ -383,8 +385,10 @@ void sendSyncRequests(final String syncId, final List shards, Clus transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId), new TransportResponseHandler() { @Override - public ShardSyncedFlushResponse newInstance() { - return new ShardSyncedFlushResponse(); + public ShardSyncedFlushResponse read(StreamInput in) throws IOException { + ShardSyncedFlushResponse response = new ShardSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override @@ -437,8 +441,10 @@ void sendPreSyncRequests(final List shards, final ClusterState sta } transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler() { @Override - public PreSyncedFlushResponse newInstance() { - return new PreSyncedFlushResponse(); + public PreSyncedFlushResponse read(StreamInput in) throws IOException { + PreSyncedFlushResponse response = new PreSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index f60994a4bced4..39709eb3ac2ff 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -195,8 +196,10 @@ private void doRecovery(final long recoveryId) { transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override - public RecoveryResponse newInstance() { - return new RecoveryResponse(); + public RecoveryResponse read(StreamInput in) throws IOException { + RecoveryResponse recoveryResponse = new RecoveryResponse(); + recoveryResponse.readFrom(in); + return recoveryResponse; } }).txGet())); final RecoveryResponse recoveryResponse = responseHolder.get(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java index 530b8b67415d3..8633380f3947a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java @@ -63,8 +63,10 @@ public void readFrom(final StreamInput in) throws IOException { static TransportResponseHandler HANDLER = new FutureTransportResponseHandler() { @Override - public RecoveryTranslogOperationsResponse newInstance() { - return new RecoveryTranslogOperationsResponse(); + public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException { + RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse(); + response.readFrom(in); + return response; } }; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 6512e25fc0b78..f1a43077a6215 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1101,6 +1101,10 @@ public static final class CanMatchResponse extends SearchPhaseResult { public CanMatchResponse() { } + public CanMatchResponse(StreamInput in) throws IOException { + this.canMatch = in.readBoolean(); + } + public CanMatchResponse(boolean canMatch) { this.canMatch = canMatch; } diff --git a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 8de89089c4f01..718b895217433 100644 --- a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -46,6 +46,10 @@ public class DfsSearchResult extends SearchPhaseResult { public DfsSearchResult() { } + public DfsSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public DfsSearchResult(long id, SearchShardTarget shardTarget) { this.setSearchShardTarget(shardTarget); this.requestId = id; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index a5f27733ad28a..12391151861d0 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -38,6 +38,10 @@ public final class FetchSearchResult extends SearchPhaseResult { public FetchSearchResult() { } + public FetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public FetchSearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 8d1e6276e65d9..0a5a7cec375db 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -38,6 +38,10 @@ public final class QueryFetchSearchResult extends SearchPhaseResult { public QueryFetchSearchResult() { } + public QueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { this.queryResult = queryResult; this.fetchResult = fetchResult; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index 55aa4a96d018c..6b0a8b619bff3 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -36,6 +36,10 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { public ScrollQueryFetchSearchResult() { } + public ScrollQueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 2aded57ece04c..43654823914b4 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -66,6 +66,10 @@ public final class QuerySearchResult extends SearchPhaseResult { public QuerySearchResult() { } + public QuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index 6401459489955..632d148ea901b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -35,6 +35,10 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult { public ScrollQuerySearchResult() { } + public ScrollQuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java index c5814cf0fefcc..7ff1ef8391fd6 100644 --- a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; public class EmptyTransportResponseHandler implements TransportResponseHandler { @@ -32,7 +33,7 @@ public EmptyTransportResponseHandler(String executor) { } @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index d93bbb57201e2..8e72e6d5768f1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -47,7 +47,7 @@ void doExecute(Action action, Request request, ActionListener { Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c9f3a2aa36540..48f086ad972bf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -218,8 +218,8 @@ private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, new TransportResponseHandler() { @Override - public ClusterSearchShardsResponse newInstance() { - return new ClusterSearchShardsResponse(); + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override @@ -591,8 +591,10 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl } @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + ClusterStateResponse response = new ClusterStateResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 27b4aa7293e18..ad41e8c2902a3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -205,7 +205,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric readBytesMetric = new MeanMetric(); private final MeanMetric transmittedBytesMetric = new MeanMetric(); - private volatile Map requestHandlers = Collections.emptyMap(); + private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final TransportLogger transportLogger; private final BytesReference pingMessage; @@ -284,8 +284,8 @@ private static class HandshakeResponseHandler implements TransportResponseHandle } @Override - public VersionHandshakeResponse newInstance() { - return new VersionHandshakeResponse(); + public VersionHandshakeResponse read(StreamInput in) throws IOException { + return new VersionHandshakeResponse(in); } @Override @@ -1273,7 +1273,8 @@ public final void messageReceived(BytesReference reference, TcpChannel channel) if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener); + TransportResponseHandler theHandler = + responseHandlers.onResponseReceived(requestId, messageListener); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { @@ -1319,8 +1320,9 @@ static void ensureVersionCompatibility(Version version, Version currentVersion, } } - private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) { - final TransportResponse response; + private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, + final TransportResponseHandler handler) { + final T response; try { response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); @@ -1469,17 +1471,13 @@ public void onFailure(Exception e) { } private static final class VersionHandshakeResponse extends TransportResponse { - private Version version; + private final Version version; private VersionHandshakeResponse(Version version) { this.version = version; } - private VersionHandshakeResponse() { - } - - @Override - public void readFrom(StreamInput in) throws IOException { + private VersionHandshakeResponse(StreamInput in) throws IOException { super.readFrom(in); version = Version.readVersion(in); } @@ -1736,7 +1734,7 @@ public final ResponseHandlers getResponseHandlers() { } @Override - public final RequestHandlerRegistry getRequestHandler(String action) { + public final RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index fc1f0c9e5ec0f..e13213dca066a 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -54,7 +54,7 @@ public interface Transport extends LifecycleComponent { * Returns the registered request handler registry for the given action or null if it's not registered * @param action the action to look up */ - RequestHandlerRegistry getRequestHandler(String action); + RequestHandlerRegistry getRequestHandler(String action); void addMessageListener(TransportMessageListener listener); @@ -184,7 +184,7 @@ public String action() { * This class is a registry that allows */ final class ResponseHandlers { - private final ConcurrentMapLong handlers = ConcurrentCollections + private final ConcurrentMapLong> handlers = ConcurrentCollections .newConcurrentMapLongWithAggressiveConcurrency(); private final AtomicLong requestIdGenerator = new AtomicLong(); @@ -208,7 +208,7 @@ public ResponseContext remove(long requestId) { * @return the new request ID * @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions) */ - public long add(ResponseContext holder) { + public long add(ResponseContext holder) { long requestId = newRequestId(); ResponseContext existing = handlers.put(requestId, holder); assert existing == null : "request ID already in use: " + requestId; @@ -226,10 +226,10 @@ long newRequestId() { /** * Removes and returns all {@link ResponseContext} instances that match the predicate */ - public List prune(Predicate predicate) { - final List holders = new ArrayList<>(); - for (Map.Entry entry : handlers.entrySet()) { - ResponseContext holder = entry.getValue(); + public List> prune(Predicate predicate) { + final List> holders = new ArrayList<>(); + for (Map.Entry> entry : handlers.entrySet()) { + ResponseContext holder = entry.getValue(); if (predicate.test(holder)) { ResponseContext remove = handlers.remove(entry.getKey()); if (remove != null) { @@ -245,8 +245,9 @@ public List prune(Predicate predicate) { * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ - public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) { - ResponseContext context = handlers.remove(requestId); + public TransportResponseHandler onResponseReceived(final long requestId, + final TransportMessageListener listener) { + ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { return null; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index a17509e826003..a5b926249f8e2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.function.Function; -import java.util.function.Supplier; /** * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second @@ -43,10 +42,10 @@ private static class ProxyRequestHandler implements Tran private final TransportService service; private final String action; - private final Function> responseFunction; + private final Function> responseFunction; ProxyRequestHandler(TransportService service, String action, Function> responseFunction) { + Writeable.Reader> responseFunction) { this.service = service; this.action = action; this.responseFunction = responseFunction; @@ -63,17 +62,17 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro private static class ProxyResponseHandler implements TransportResponseHandler { - private final Supplier responseFactory; + private final Writeable.Reader reader; private final TransportChannel channel; - ProxyResponseHandler(TransportChannel channel, Supplier responseFactory) { - this.responseFactory = responseFactory; + ProxyResponseHandler(TransportChannel channel, Writeable.Reader reader) { + this.reader = reader; this.channel = channel; - } + @Override - public T newInstance() { - return responseFactory.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override @@ -101,26 +100,25 @@ public String executor() { } static class ProxyRequest extends TransportRequest { - T wrapped; - Writeable.Reader reader; - DiscoveryNode targetNode; - - ProxyRequest(Writeable.Reader reader) { - this.reader = reader; - } + final T wrapped; + final DiscoveryNode targetNode; ProxyRequest(T wrapped, DiscoveryNode targetNode) { this.wrapped = wrapped; this.targetNode = targetNode; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -133,21 +131,23 @@ public void writeTo(StreamOutput out) throws IOException { * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type changes based on the upcoming request (quite rare) */ - public static void registerProxyAction(TransportService service, String action, - Function> responseFunction) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, responseFunction)); + public static void registerProxyActionWithDynamicResponseType(TransportService service, String action, + Function> responseFunction) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, responseFunction)); } /** * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type is always the same (most of the cases). */ - public static void registerProxyAction(TransportService service, String action, Supplier responseSupplier) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, request -> responseSupplier)); + public static void registerProxyAction(TransportService service, String action, + Writeable.Reader reader) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, request -> reader)); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 4ba2769edb4a2..6b45feec94859 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -21,10 +21,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel @@ -34,19 +35,19 @@ public class TransportChannelResponseHandler implem private final Logger logger; private final TransportChannel channel; private final String extraInfoOnError; - private final Supplier responseSupplier; + private final Writeable.Reader reader; public TransportChannelResponseHandler(Logger logger, TransportChannel channel, String extraInfoOnError, - Supplier responseSupplier) { + Writeable.Reader reader) { this.logger = logger; this.channel = channel; this.extraInfoOnError = extraInfoOnError; - this.responseSupplier = responseSupplier; + this.reader = reader; } @Override - public T newInstance() { - return responseSupplier.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index ecaca73b2db57..05deab8eafbf0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -39,6 +39,19 @@ public TransportAddress remoteAddress() { return remoteAddress; } + /** + * Constructs a new empty transport message + */ + public TransportMessage() { + } + + /** + * Constructs a new transport message with the data from the {@link StreamInput}. This is + * currently a no-op + */ + public TransportMessage(StreamInput in) throws IOException { + } + @Override public void readFrom(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java index 25ae72a479f7d..5ad9c9fee544e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -19,8 +19,27 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + public abstract class TransportResponse extends TransportMessage { + /** + * Constructs a new empty transport response + */ + public TransportResponse() { + } + + /** + * Constructs a new transport response with the data from the {@link StreamInput}. This is + * currently a no-op. However, this exists to allow extenders to call super(in) + * so that reading can mirror writing where we often call super.writeTo(out). + */ + public TransportResponse(StreamInput in) throws IOException { + super(in); + } + public static class Empty extends TransportResponse { public static final Empty INSTANCE = new Empty(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index fbe477ad04b1d..29720216cf400 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,34 +19,10 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; -import java.io.IOException; - public interface TransportResponseHandler extends Writeable.Reader { - /** - * @deprecated Implement {@link #read(StreamInput)} instead. - */ - @Deprecated - default T newInstance() { - throw new UnsupportedOperationException(); - } - - /** - * deserializes a new instance of the return type from the stream. - * called by the infra when de-serializing the response. - * - * @return the deserialized response. - */ - @Override - default T read(StreamInput in) throws IOException { - T instance = newInstance(); - instance.readFrom(in); - return instance; - } - void handleResponse(T response); void handleException(TransportException exp); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index db14fd015fd82..c2ae982b3dce1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -434,8 +434,8 @@ public HandshakeResponse handshake( PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @Override - public HandshakeResponse newInstance() { - return new HandshakeResponse(); + public HandshakeResponse read(StreamInput in) throws IOException { + return new HandshakeResponse(in); } }); sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, @@ -468,12 +468,9 @@ private HandshakeRequest() { } public static class HandshakeResponse extends TransportResponse { - private DiscoveryNode discoveryNode; - private ClusterName clusterName; - private Version version; - - HandshakeResponse() { - } + private final DiscoveryNode discoveryNode; + private final ClusterName clusterName; + private final Version version; public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { this.discoveryNode = discoveryNode; @@ -481,9 +478,8 @@ public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, V this.clusterName = clusterName; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public HandshakeResponse(StreamInput in) throws IOException { + super(in); discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); clusterName = new ClusterName(in); version = Version.readVersion(in); @@ -930,7 +926,7 @@ public void onRequestReceived(long requestId, String action) { } } - public RequestHandlerRegistry getRequestHandler(String action) { + public RequestHandlerRegistry getRequestHandler(String action) { return transport.getRequestHandler(action); } @@ -977,8 +973,8 @@ private void checkForTimeout(long requestId) { @Override public void onConnectionClosed(Transport.Connection connection) { try { - List pruned = responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection - .getCacheKey())); + List> pruned = + responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey())); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows getExecutorService().execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java index f685be02141ad..fbfe0e497017f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java @@ -83,8 +83,7 @@ public void testSerialization() throws Exception { clusterSearchShardsResponse.writeTo(out); try(StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) { in.setVersion(version); - ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(); - deserialized.readFrom(in); + ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(in); assertArrayEquals(clusterSearchShardsResponse.getNodes(), deserialized.getNodes()); assertEquals(clusterSearchShardsResponse.getGroups().length, deserialized.getGroups().length); for (int i = 0; i < clusterSearchShardsResponse.getGroups().length; i++) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index c763709a04e40..e529af97c800d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -254,7 +254,7 @@ public void testBuildClusters() { remoteIndices.put(cluster, randomOriginalIndices()); if (onlySuccessful || randomBoolean()) { //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse()); + searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); successful++; } else { searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index d64cdf89ef7ea..41d691c95bd90 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -253,8 +253,8 @@ public void onFailure(Exception e) { iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) { + return new TestResponse(in); } @Override @@ -435,5 +435,7 @@ public static class TestRequest extends TransportRequest { private static class TestResponse extends TransportResponse { + private TestResponse() {} + private TestResponse(StreamInput in) {} } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 46364c19ee0ec..6c27680d74162 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -172,9 +172,7 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -215,9 +213,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -233,9 +229,7 @@ public ClusterSearchShardsResponse read(StreamInput in) throws IOException { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index 428d416ac0242..7d52c12e47364 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -86,8 +86,7 @@ public void testSendMessage() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -96,8 +95,7 @@ public void testSendMessage() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -105,8 +103,7 @@ public void testSendMessage() throws InterruptedException { serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_C"; + SimpleTestResponse response = new SimpleTestResponse("TS_C"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceC, "internal:test", SimpleTestResponse::new); @@ -115,8 +112,8 @@ public void testSendMessage() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -131,7 +128,7 @@ public void handleResponse(SimpleTestResponse response) { @Override public void handleException(TransportException exp) { try { - throw new AssertionError(exp); + throw new AssertionError(exp); } finally { latch.countDown(); } @@ -149,8 +146,7 @@ public void testException() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -159,8 +155,7 @@ public void testException() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -175,8 +170,8 @@ public void testException() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -228,11 +223,20 @@ public void writeTo(StreamOutput out) throws IOException { } public static class SimpleTestResponse extends TransportResponse { - String targetNode; + final String targetNode; + + SimpleTestResponse(String targetNode) { + this.targetNode = targetNode; + } + + SimpleTestResponse(StreamInput in) throws IOException { + super(in); + this.targetNode = in.readString(); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - targetNode = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -263,7 +267,7 @@ public void testIsProxyAction() { } public void testIsProxyRequest() { - assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null))); + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(TransportRequest.Empty.INSTANCE, null))); assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 132a07d5b7f48..1b8405a2d591a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -163,8 +164,10 @@ public void clear() { /** * simulate a response for the given requestId */ - public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + public void handleResponse(final long requestId, final Response response) { + TransportResponseHandler handler = + (TransportResponseHandler) responseHandlers.onResponseReceived(requestId, listener); + handler.handleResponse(response); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3b64f00084ec8..592de73cbfc74 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -219,8 +219,8 @@ public void testHelloWorld() { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -250,8 +250,8 @@ public void handleException(TransportException exp) { res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -298,8 +298,8 @@ public void testThreadContext() throws ExecutionException, InterruptedException final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0])); TransportResponseHandler responseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -353,8 +353,8 @@ public void testLocalNodeConnection() throws InterruptedException { serviceA.sendRequest(nodeA, "internal:localNode", new StringMessageRequest("test"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -502,7 +502,7 @@ public void testVoidMessageCompressed() { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -550,8 +550,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -592,8 +592,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHelloException", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -644,7 +644,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti serviceA.registerRequestHandler("internal:test", TestRequest::new, randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { logger.info("caught exception while responding", e); responseErrors.add(e); @@ -652,7 +652,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti }); final TransportRequestHandler ignoringRequestHandler = (request, channel, task) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { // we don't really care what's going on B, we're testing through A logger.trace("caught exception while responding from node B", e); @@ -808,8 +808,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -872,8 +872,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -910,8 +910,8 @@ public void handleException(TransportException exp) { new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -961,8 +961,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportResponseHandler noopResponseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1160,19 +1160,19 @@ public void writeTo(StreamOutput out) throws IOException { static class StringMessageResponse extends TransportResponse { - private String message; + private final String message; StringMessageResponse(String message) { this.message = message; } - StringMessageResponse() { + StringMessageResponse(StreamInput in) throws IOException { + this.message = in.readString(); } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - message = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1224,12 +1224,19 @@ public void writeTo(StreamOutput out) throws IOException { static class Version0Response extends TransportResponse { - int value1; + final int value1; + + Version0Response(int value1) { + this.value1 = value1; + } + + Version0Response(StreamInput in) throws IOException { + this.value1 = in.readInt(); + } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - value1 = in.readInt(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1241,16 +1248,27 @@ public void writeTo(StreamOutput out) throws IOException { static class Version1Response extends Version0Response { - int value2; + final int value2; - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + Version1Response(int value1, int value2) { + super(value1); + this.value2 = value2; + } + + Version1Response(StreamInput in) throws IOException { + super(in); if (in.getVersion().onOrAfter(version1)) { value2 = in.readInt(); + } else { + value2 = 0; } } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -1267,9 +1285,7 @@ public void testVersionFrom0to1() throws Exception { public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(0)); // not set, coming from service A - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1280,8 +1296,8 @@ public void messageReceived(Version1Request request, TransportChannel channel, T Version0Response version0Response = serviceA.submitRequest(nodeB, "internal:version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1310,8 +1326,7 @@ public void testVersionFrom1to0() throws Exception { @Override public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1323,8 +1338,8 @@ public void messageReceived(Version0Request request, TransportChannel channel, T Version1Response version1Response = serviceB.submitRequest(nodeA, "internal:version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1354,9 +1369,7 @@ public void testVersionFrom1to1() throws Exception { (request, channel, task) -> { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(2)); - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version1, channel.getVersion()); }); @@ -1367,8 +1380,8 @@ public void testVersionFrom1to1() throws Exception { Version1Response version1Response = serviceB.submitRequest(nodeB, "internal:version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1397,8 +1410,7 @@ public void testVersionFrom0to0() throws Exception { serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); }); @@ -1408,8 +1420,8 @@ public void testVersionFrom0to0() throws Exception { Version0Response version0Response = serviceA.submitRequest(nodeA, "internal:version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1444,8 +1456,8 @@ public void testMockFailToSendNoConnectRule() throws Exception { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1502,8 +1514,8 @@ public void testMockUnresponsiveRule() throws IOException { new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1547,13 +1559,13 @@ public void testHostOnMessages() throws InterruptedException { final AtomicReference addressB = new AtomicReference<>(); serviceB.registerRequestHandler("internal:action1", TestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { addressA.set(request.remoteAddress()); - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); latch.countDown(); }); serviceA.sendRequest(nodeB, "internal:action1", new TestRequest(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1600,8 +1612,8 @@ public void testBlockingIncomingRequests() throws Exception { serviceA.sendRequest(connection, "internal:action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1666,9 +1678,10 @@ public String toString() { private static class TestResponse extends TransportResponse { - String info; + final String info; - TestResponse() { + TestResponse(StreamInput in) throws IOException { + this.info = in.readOptionalString(); } TestResponse(String info) { @@ -1677,8 +1690,7 @@ private static class TestResponse extends TransportResponse { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - info = in.readOptionalString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1763,8 +1775,8 @@ public void messageReceived(TestRequest request, TransportChannel channel, Task TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1820,8 +1832,8 @@ class TestResponseHandler implements TransportResponseHandler { } @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -2086,7 +2098,7 @@ public void testResponseHeadersArePreserved() throws InterruptedException { TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2140,7 +2152,7 @@ public void testHandlerIsInvokedOnConnectionClose() throws IOException, Interrup CountDownLatch latch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2217,7 +2229,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2285,7 +2297,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2399,7 +2411,7 @@ protected void doRun() throws Exception { AtomicReference receivedException = new AtomicReference<>(null); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index a7351ccfe14d1..1b85049da235b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -335,7 +336,7 @@ public void testContextRestoreResponseHandler() throws Exception { threadContext.wrapRestorable(storedContext), new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } @@ -374,7 +375,7 @@ public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exc new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java index abd5768bebec9..6ff18cc77a1e2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -176,8 +177,12 @@ public void testThatConnectionToClientTypeConnectionIsRejected() throws IOExcept TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TransportResponse newInstance() { - fail("never get that far"); + public TransportResponse read(StreamInput in) { + try { + fail("never get that far"); + } finally { + latch.countDown(); + } return null; }