Skip to content

Commit 39f62de

Browse files
committed
Revamp TransportRequest handlers to support Writeable (#26315)
Backport of PR to 6.x, excluding the changes to SimulatePipelineRequest
1 parent 79d2d74 commit 39f62de

File tree

11 files changed

+91
-20
lines changed

11 files changed

+91
-20
lines changed

server/src/main/java/org/elasticsearch/action/ActionRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public ActionRequest() {
3434
// this.listenerThreaded = request.listenerThreaded();
3535
}
3636

37+
public ActionRequest(StreamInput in) throws IOException {
38+
super(in);
39+
}
40+
3741
public abstract ActionRequestValidationException validate();
3842

3943
/**

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionRequest;
2424
import org.elasticsearch.action.ActionResponse;
2525
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
26+
import org.elasticsearch.common.io.stream.Writeable;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.tasks.Task;
2829
import org.elasticsearch.threadpool.ThreadPool;
@@ -43,6 +44,12 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
4344
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
4445
}
4546

47+
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
48+
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
49+
IndexNameExpressionResolver indexNameExpressionResolver) {
50+
this(settings, actionName, true, threadPool, transportService, actionFilters, requestReader, indexNameExpressionResolver);
51+
}
52+
4653
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
4754
TransportService transportService, ActionFilters actionFilters,
4855
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
@@ -51,6 +58,14 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c
5158
new TransportHandler());
5259
}
5360

61+
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
62+
TransportService transportService, ActionFilters actionFilters,
63+
Writeable.Reader<Request> requestReader, IndexNameExpressionResolver indexNameExpressionResolver) {
64+
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
65+
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
66+
new TransportHandler());
67+
}
68+
5469
class TransportHandler implements TransportRequestHandler<Request> {
5570

5671
@Override

server/src/main/java/org/elasticsearch/common/io/stream/Streamable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.io.stream;
2121

2222
import java.io.IOException;
23+
import java.util.function.Supplier;
2324

2425
/**
2526
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
@@ -43,4 +44,12 @@ public interface Streamable {
4344
* Write this object's fields to a {@linkplain StreamOutput}.
4445
*/
4546
void writeTo(StreamOutput out) throws IOException;
47+
48+
static <T extends Streamable> Writeable.Reader<T> newWriteableReader(Supplier<T> supplier) {
49+
return (StreamInput in) -> {
50+
T request = supplier.get();
51+
request.readFrom(in);
52+
return request;
53+
};
54+
}
4655
}

server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.Writeable;
2224
import org.elasticsearch.tasks.Task;
2325
import org.elasticsearch.tasks.TaskManager;
2426

2527
import java.io.IOException;
26-
import java.util.function.Supplier;
2728

2829
public class RequestHandlerRegistry<Request extends TransportRequest> {
2930

@@ -32,15 +33,14 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
3233
private final boolean forceExecution;
3334
private final boolean canTripCircuitBreaker;
3435
private final String executor;
35-
private final Supplier<Request> requestFactory;
3636
private final TaskManager taskManager;
37+
private final Writeable.Reader<Request> requestReader;
3738

38-
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
39+
public RequestHandlerRegistry(String action, Writeable.Reader<Request> requestReader, TaskManager taskManager,
3940
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
4041
boolean canTripCircuitBreaker) {
4142
this.action = action;
42-
this.requestFactory = requestFactory;
43-
assert newRequest() != null;
43+
this.requestReader = requestReader;
4444
this.handler = handler;
4545
this.forceExecution = forceExecution;
4646
this.canTripCircuitBreaker = canTripCircuitBreaker;
@@ -52,8 +52,8 @@ public String getAction() {
5252
return action;
5353
}
5454

55-
public Request newRequest() {
56-
return requestFactory.get();
55+
public Request newRequest(StreamInput in) throws IOException {
56+
return requestReader.read(in);
5757
}
5858

5959
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1505,9 +1505,8 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
15051505
}
15061506
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName,
15071507
messageLengthBytes);
1508-
final TransportRequest request = reg.newRequest();
1508+
final TransportRequest request = reg.newRequest(stream);
15091509
request.remoteAddress(new TransportAddress(remoteAddress));
1510-
request.readFrom(stream);
15111510
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
15121511
validateRequest(stream, requestId, action);
15131512
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526

2627
import java.io.IOException;
@@ -100,11 +101,11 @@ public String executor() {
100101

101102
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
102103
T wrapped;
103-
Supplier<T> supplier;
104+
Writeable.Reader<T> reader;
104105
DiscoveryNode targetNode;
105106

106-
ProxyRequest(Supplier<T> supplier) {
107-
this.supplier = supplier;
107+
ProxyRequest(Writeable.Reader<T> reader) {
108+
this.reader = reader;
108109
}
109110

110111
ProxyRequest(T wrapped, DiscoveryNode targetNode) {
@@ -116,8 +117,7 @@ static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
116117
public void readFrom(StreamInput in) throws IOException {
117118
super.readFrom(in);
118119
targetNode = new DiscoveryNode(in);
119-
wrapped = supplier.get();
120-
wrapped.readFrom(in);
120+
wrapped = reader.read(in);
121121
}
122122

123123
@Override

server/src/main/java/org/elasticsearch/transport/TransportMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
2424
import org.elasticsearch.common.io.stream.Streamable;
25+
import org.elasticsearch.common.io.stream.Writeable;
2526
import org.elasticsearch.common.transport.TransportAddress;
2627

2728
import java.io.IOException;
2829

29-
public abstract class TransportMessage implements Streamable {
30+
public abstract class TransportMessage implements Streamable, Writeable {
3031

3132
private TransportAddress remoteAddress;
3233

server/src/main/java/org/elasticsearch/transport/TransportRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public static class Empty extends TransportRequest {
3939
public TransportRequest() {
4040
}
4141

42+
public TransportRequest(StreamInput in) throws IOException {
43+
parentTaskId = TaskId.readFromStream(in);
44+
}
45+
4246
/**
4347
* Set a reference to task that created this request.
4448
*/

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.common.component.AbstractLifecycleComponent;
3333
import org.elasticsearch.common.io.stream.StreamInput;
3434
import org.elasticsearch.common.io.stream.StreamOutput;
35+
import org.elasticsearch.common.io.stream.Streamable;
36+
import org.elasticsearch.common.io.stream.Writeable;
3537
import org.elasticsearch.common.logging.Loggers;
3638
import org.elasticsearch.common.regex.Regex;
3739
import org.elasticsearch.common.settings.ClusterSettings;
@@ -713,7 +715,24 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
713715
String executor, TransportRequestHandler<Request> handler) {
714716
handler = interceptor.interceptHandler(action, executor, false, handler);
715717
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
716-
action, requestFactory, taskManager, handler, executor, false, true);
718+
action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true);
719+
registerRequestHandler(reg);
720+
}
721+
722+
/**
723+
* Registers a new request handler
724+
*
725+
* @param action The action the request handler is associated with
726+
* @param requestReader a callable to be used construct new instances for streaming
727+
* @param executor The executor the request handling will be executed on
728+
* @param handler The handler itself that implements the request handling
729+
*/
730+
public <Request extends TransportRequest> void registerRequestHandler(String action, String executor,
731+
Writeable.Reader<Request> requestReader,
732+
TransportRequestHandler<Request> handler) {
733+
handler = interceptor.interceptHandler(action, executor, false, handler);
734+
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
735+
action, requestReader, taskManager, handler, executor, false, true);
717736
registerRequestHandler(reg);
718737
}
719738

@@ -733,7 +752,28 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
733752
TransportRequestHandler<Request> handler) {
734753
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
735754
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
736-
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
755+
action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
756+
registerRequestHandler(reg);
757+
}
758+
759+
/**
760+
* Registers a new request handler
761+
*
762+
* @param action The action the request handler is associated with
763+
* @param requestReader The request class that will be used to construct new instances for streaming
764+
* @param executor The executor the request handling will be executed on
765+
* @param forceExecution Force execution on the executor queue and never reject it
766+
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
767+
* @param handler The handler itself that implements the request handling
768+
*/
769+
public <Request extends TransportRequest> void registerRequestHandler(String action,
770+
String executor, boolean forceExecution,
771+
boolean canTripCircuitBreaker,
772+
Writeable.Reader<Request> requestReader,
773+
TransportRequestHandler<Request> handler) {
774+
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
775+
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
776+
action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
737777
registerRequestHandler(reg);
738778
}
739779

server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public void testIsProxyAction() {
267267
}
268268

269269
public void testIsProxyRequest() {
270-
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null)));
270+
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null)));
271271
assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE));
272272
}
273273
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,7 @@ protected void sendRequest(Connection connection, long requestId, String action,
431431
RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action);
432432
BytesStreamOutput bStream = new BytesStreamOutput();
433433
request.writeTo(bStream);
434-
final TransportRequest clonedRequest = reg.newRequest();
435-
clonedRequest.readFrom(bStream.bytes().streamInput());
434+
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
436435

437436
Runnable runnable = new AbstractRunnable() {
438437
AtomicBoolean requestSent = new AtomicBoolean();

0 commit comments

Comments
 (0)