Skip to content

Commit 6ab4b6b

Browse files
authored
revamp TransportRequest handlers to support Writeable (#26315)
This PR begins the long journey to deprecating Streamable. The idea here is to add additional method signatures that support Writeable.Reader, so that the work to migrate objects TransportMessage to implement Writeable and not Streamable. One example conversion is done in this PR: SimulatePipelineRequest.
1 parent 4756c9a commit 6ab4b6b

File tree

14 files changed

+109
-35
lines changed

14 files changed

+109
-35
lines changed

core/src/main/java/org/elasticsearch/action/ActionRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public ActionRequest() {
3434
// this.listenerThreaded = request.listenerThreaded();
3535
}
3636

37+
public ActionRequest(StreamInput in) throws IOException {
38+
super(in);
39+
}
40+
3741
public abstract ActionRequestValidationException validate();
3842

3943
/**

core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.bytes.BytesReference;
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.io.stream.Writeable;
2829
import org.elasticsearch.common.xcontent.XContentFactory;
2930
import org.elasticsearch.common.xcontent.XContentType;
3031
import org.elasticsearch.ingest.ConfigurationUtils;
@@ -68,6 +69,18 @@ public SimulatePipelineRequest(BytesReference source, XContentType xContentType)
6869
SimulatePipelineRequest() {
6970
}
7071

72+
SimulatePipelineRequest(StreamInput in) throws IOException {
73+
super(in);
74+
id = in.readOptionalString();
75+
verbose = in.readBoolean();
76+
source = in.readBytesReference();
77+
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
78+
xContentType = XContentType.readFrom(in);
79+
} else {
80+
xContentType = XContentFactory.xContentType(source);
81+
}
82+
}
83+
7184
@Override
7285
public ActionRequestValidationException validate() {
7386
return null;
@@ -99,15 +112,7 @@ public XContentType getXContentType() {
99112

100113
@Override
101114
public void readFrom(StreamInput in) throws IOException {
102-
super.readFrom(in);
103-
id = in.readOptionalString();
104-
verbose = in.readBoolean();
105-
source = in.readBytesReference();
106-
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
107-
xContentType = XContentType.readFrom(in);
108-
} else {
109-
xContentType = XContentFactory.xContentType(source);
110-
}
115+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
111116
}
112117

113118
@Override

core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
4040

4141
@Inject
4242
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
43-
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
43+
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, SimulatePipelineRequest::new, indexNameExpressionResolver);
4444
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
4545
this.executionService = new SimulateExecutionService(threadPool);
4646
}

core/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionRequest;
2424
import org.elasticsearch.action.ActionResponse;
2525
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
26+
import org.elasticsearch.common.io.stream.Writeable;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.tasks.Task;
2829
import org.elasticsearch.threadpool.ThreadPool;
@@ -43,6 +44,12 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
4344
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
4445
}
4546

47+
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
48+
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
49+
IndexNameExpressionResolver indexNameExpressionResolver) {
50+
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestReader);
51+
}
52+
4653
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
4754
TransportService transportService, ActionFilters actionFilters,
4855
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
@@ -51,6 +58,14 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c
5158
new TransportHandler());
5259
}
5360

61+
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
62+
TransportService transportService, ActionFilters actionFilters,
63+
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader) {
64+
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
65+
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
66+
new TransportHandler());
67+
}
68+
5469
class TransportHandler implements TransportRequestHandler<Request> {
5570

5671
@Override

core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.io.stream;
2121

2222
import java.io.IOException;
23+
import java.util.function.Supplier;
2324

2425
/**
2526
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
@@ -43,4 +44,12 @@ public interface Streamable {
4344
* Write this object's fields to a {@linkplain StreamOutput}.
4445
*/
4546
void writeTo(StreamOutput out) throws IOException;
47+
48+
static <T extends Streamable> Writeable.Reader<T> newWriteableReader(Supplier<T> supplier) {
49+
return (StreamInput in) -> {
50+
T request = supplier.get();
51+
request.readFrom(in);
52+
return request;
53+
};
54+
}
4655
}

core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.Writeable;
2224
import org.elasticsearch.tasks.Task;
2325
import org.elasticsearch.tasks.TaskManager;
2426

@@ -32,15 +34,14 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
3234
private final boolean forceExecution;
3335
private final boolean canTripCircuitBreaker;
3436
private final String executor;
35-
private final Supplier<Request> requestFactory;
3637
private final TaskManager taskManager;
38+
private final Writeable.Reader<Request> requestReader;
3739

38-
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
40+
public RequestHandlerRegistry(String action, Writeable.Reader<Request> requestReader, TaskManager taskManager,
3941
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
4042
boolean canTripCircuitBreaker) {
4143
this.action = action;
42-
this.requestFactory = requestFactory;
43-
assert newRequest() != null;
44+
this.requestReader = requestReader;
4445
this.handler = handler;
4546
this.forceExecution = forceExecution;
4647
this.canTripCircuitBreaker = canTripCircuitBreaker;
@@ -52,8 +53,8 @@ public String getAction() {
5253
return action;
5354
}
5455

55-
public Request newRequest() {
56-
return requestFactory.get();
56+
public Request newRequest(StreamInput in) throws IOException {
57+
return requestReader.read(in);
5758
}
5859

5960
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,8 @@ protected String handleRequest(Channel channel, String profileName, final Stream
14751475
}
14761476
transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName,
14771477
messageLengthBytes);
1478-
final TransportRequest request = reg.newRequest();
1478+
final TransportRequest request = reg.newRequest(stream);
14791479
request.remoteAddress(new TransportAddress(remoteAddress));
1480-
request.readFrom(stream);
14811480
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
14821481
validateRequest(stream, requestId, action);
14831482
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.common.io.stream.Writeable;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627

2728
import java.io.IOException;
@@ -97,11 +98,11 @@ public String executor() {
9798

9899
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
99100
T wrapped;
100-
Supplier<T> supplier;
101+
Writeable.Reader<T> reader;
101102
DiscoveryNode targetNode;
102103

103-
ProxyRequest(Supplier<T> supplier) {
104-
this.supplier = supplier;
104+
ProxyRequest(Writeable.Reader<T> reader) {
105+
this.reader = reader;
105106
}
106107

107108
ProxyRequest(T wrapped, DiscoveryNode targetNode) {
@@ -113,8 +114,7 @@ static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
113114
public void readFrom(StreamInput in) throws IOException {
114115
super.readFrom(in);
115116
targetNode = new DiscoveryNode(in);
116-
wrapped = supplier.get();
117-
wrapped.readFrom(in);
117+
wrapped = reader.read(in);
118118
}
119119

120120
@Override

core/src/main/java/org/elasticsearch/transport/TransportMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
2424
import org.elasticsearch.common.io.stream.Streamable;
25+
import org.elasticsearch.common.io.stream.Writeable;
2526
import org.elasticsearch.common.transport.TransportAddress;
2627

2728
import java.io.IOException;
2829

29-
public abstract class TransportMessage implements Streamable {
30+
public abstract class TransportMessage implements Streamable, Writeable {
3031

3132
private TransportAddress remoteAddress;
3233

core/src/main/java/org/elasticsearch/transport/TransportRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public static class Empty extends TransportRequest {
3939
public TransportRequest() {
4040
}
4141

42+
public TransportRequest(StreamInput in) throws IOException {
43+
parentTaskId = TaskId.readFromStream(in);
44+
}
45+
4246
/**
4347
* Set a reference to task that created this request.
4448
*/

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3333
import org.elasticsearch.common.io.stream.StreamInput;
3434
import org.elasticsearch.common.io.stream.StreamOutput;
35+
import org.elasticsearch.common.io.stream.Streamable;
36+
import org.elasticsearch.common.io.stream.Writeable;
3537
import org.elasticsearch.common.logging.Loggers;
3638
import org.elasticsearch.common.metrics.MeanMetric;
3739
import org.elasticsearch.common.regex.Regex;
@@ -709,7 +711,24 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
709711
String executor, TransportRequestHandler<Request> handler) {
710712
handler = interceptor.interceptHandler(action, executor, false, handler);
711713
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
712-
action, requestFactory, taskManager, handler, executor, false, true);
714+
action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true);
715+
registerRequestHandler(reg);
716+
}
717+
718+
/**
719+
* Registers a new request handler
720+
*
721+
* @param action The action the request handler is associated with
722+
* @param requestReader a callable to be used construct new instances for streaming
723+
* @param executor The executor the request handling will be executed on
724+
* @param handler The handler itself that implements the request handling
725+
*/
726+
public <Request extends TransportRequest> void registerRequestHandler(String action, String executor,
727+
Writeable.Reader<Request> requestReader,
728+
TransportRequestHandler<Request> handler) {
729+
handler = interceptor.interceptHandler(action, executor, false, handler);
730+
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
731+
action, requestReader, taskManager, handler, executor, false, true);
713732
registerRequestHandler(reg);
714733
}
715734

@@ -729,7 +748,28 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
729748
TransportRequestHandler<Request> handler) {
730749
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
731750
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
732-
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
751+
action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
752+
registerRequestHandler(reg);
753+
}
754+
755+
/**
756+
* Registers a new request handler
757+
*
758+
* @param action The action the request handler is associated with
759+
* @param requestReader The request class that will be used to construct new instances for streaming
760+
* @param executor The executor the request handling will be executed on
761+
* @param forceExecution Force execution on the executor queue and never reject it
762+
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
763+
* @param handler The handler itself that implements the request handling
764+
*/
765+
public <Request extends TransportRequest> void registerRequestHandler(String action,
766+
String executor, boolean forceExecution,
767+
boolean canTripCircuitBreaker,
768+
Writeable.Reader<Request> requestReader,
769+
TransportRequestHandler<Request> handler) {
770+
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
771+
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
772+
action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
733773
registerRequestHandler(reg);
734774
}
735775

core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ public void testSerialization() throws IOException {
4949
BytesStreamOutput out = new BytesStreamOutput();
5050
request.writeTo(out);
5151
StreamInput streamInput = out.bytes().streamInput();
52-
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest();
53-
otherRequest.readFrom(streamInput);
52+
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(streamInput);
5453

5554
assertThat(otherRequest.getId(), equalTo(request.getId()));
5655
assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose()));
@@ -65,8 +64,7 @@ public void testSerializationWithXContent() throws IOException {
6564
request.writeTo(output);
6665
StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes);
6766

68-
SimulatePipelineRequest serialized = new SimulatePipelineRequest();
69-
serialized.readFrom(in);
67+
SimulatePipelineRequest serialized = new SimulatePipelineRequest(in);
7068
assertEquals(XContentType.JSON, serialized.getXContentType());
7169
assertEquals("{}", serialized.getSource().utf8ToString());
7270
}
@@ -77,8 +75,7 @@ public void testSerializationWithXContentBwc() throws IOException {
7775
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
7876
try (StreamInput in = StreamInput.wrap(data)) {
7977
in.setVersion(version);
80-
SimulatePipelineRequest request = new SimulatePipelineRequest();
81-
request.readFrom(in);
78+
SimulatePipelineRequest request = new SimulatePipelineRequest(in);
8279
assertEquals(XContentType.JSON, request.getXContentType());
8380
assertEquals("{}", request.getSource().utf8ToString());
8481

core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public void testIsProxyAction() {
267267
}
268268

269269
public void testIsProxyRequest() {
270-
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null)));
270+
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null)));
271271
assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE));
272272
}
273273
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,7 @@ protected void sendRequest(Connection connection, long requestId, String action,
419419
RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action);
420420
BytesStreamOutput bStream = new BytesStreamOutput();
421421
request.writeTo(bStream);
422-
final TransportRequest clonedRequest = reg.newRequest();
423-
clonedRequest.readFrom(bStream.bytes().streamInput());
422+
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
424423

425424
Runnable runnable = new AbstractRunnable() {
426425
AtomicBoolean requestSent = new AtomicBoolean();

0 commit comments

Comments
 (0)