Skip to content

Commit 879aee4

Browse files
committed
HTTP/1 client connection unsolicited messages should be treated as invalid.
Motivation: The current implementation of HTTP/1 client connection fails the connection when it receives unsolicited messages and does not release them. Such messages should be properly released to avoid leaking referenced counted messages. Changes: Update the HTTP/1 client connection to handle unsolicited messages as invalid, since invalid messages are released by the invalid message handler.
1 parent 1ec6170 commit 879aee4

File tree

5 files changed

+69
-8
lines changed

5 files changed

+69
-8
lines changed

src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl>
7575

7676
private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);
7777

78-
private static final Handler<Object> INVALID_MSG_HANDLER = msg -> {
79-
ReferenceCountUtil.release(msg);
80-
throw new IllegalStateException("Invalid object " + msg);
81-
};
78+
private static final Handler<Object> INVALID_MSG_HANDLER = ReferenceCountUtil::release;
8279

8380
private final HttpClientBase client;
8481
private final HttpClientOptions options;
@@ -137,6 +134,12 @@ public HttpClientConnection evictionHandler(Handler<Void> handler) {
137134
return this;
138135
}
139136

137+
@Override
138+
public HttpClientConnection invalidMessageHandler(Handler<Object> handler) {
139+
invalidMessageHandler = handler;
140+
return this;
141+
}
142+
140143
@Override
141144
public HttpClientConnection concurrencyChangeHandler(Handler<Long> handler) {
142145
// Never changes
@@ -800,6 +803,7 @@ public void handleMessage(Object msg) {
800803
handleWsFrame((WebSocketFrame) msg);
801804
} else {
802805
invalidMessageHandler.handle(msg);
806+
fail(new VertxException("Received an invalid message: " + msg.getClass().getName()));
803807
}
804808
}
805809

@@ -809,7 +813,8 @@ private void handleHttpMessage(HttpObject obj) {
809813
stream = responses.peekFirst();
810814
}
811815
if (stream == null) {
812-
fail(new VertxException("Received HTTP message with no request in progress"));
816+
invalidMessageHandler.handle(obj);
817+
fail(new VertxException("Received an HTTP message with no request in progress: " + obj.getClass().getName()));
813818
} else if (obj instanceof io.netty.handler.codec.http.HttpResponse) {
814819
io.netty.handler.codec.http.HttpResponse response = (io.netty.handler.codec.http.HttpResponse) obj;
815820
HttpVersion version;
@@ -892,9 +897,7 @@ private void removeChannelHandlers() {
892897
// removing this codec might fire pending buffers in the HTTP decoder
893898
// this happens when the channel reads the HTTP response and the following data in a single buffer
894899
Handler<Object> prev = invalidMessageHandler;
895-
invalidMessageHandler = msg -> {
896-
ReferenceCountUtil.release(msg);
897-
};
900+
invalidMessageHandler = INVALID_MSG_HANDLER;
898901
try {
899902
pipeline.remove("codec");
900903
} finally {

src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java

+5
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ public Http2ClientConnection evictionHandler(Handler<Void> handler) {
6060
return this;
6161
}
6262

63+
@Override
64+
public HttpClientConnection invalidMessageHandler(Handler<Object> handler) {
65+
return this;
66+
}
67+
6368
@Override
6469
public Http2ClientConnection concurrencyChangeHandler(Handler<Long> handler) {
6570
concurrencyChangeHandler = handler;

src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java

+10
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class Http2UpgradeClientConnection implements HttpClientConnection {
6060
private Handler<Throwable> exceptionHandler;
6161
private Handler<Buffer> pingHandler;
6262
private Handler<Void> evictionHandler;
63+
private Handler<Object> invalidMessageHandler;
6364
private Handler<Long> concurrencyChangeHandler;
6465
private Handler<Http2Settings> remoteSettingsHandler;
6566

@@ -871,6 +872,15 @@ public HttpClientConnection evictionHandler(Handler<Void> handler) {
871872
return this;
872873
}
873874

875+
@Override
876+
public HttpClientConnection invalidMessageHandler(Handler<Object> handler) {
877+
if (current instanceof Http1xClientConnection) {
878+
invalidMessageHandler = handler;
879+
}
880+
current.invalidMessageHandler(handler);
881+
return this;
882+
}
883+
874884
@Override
875885
public HttpClientConnection concurrencyChangeHandler(Handler<Long> handler) {
876886
if (current instanceof Http1xClientConnection) {

src/main/java/io/vertx/core/http/impl/HttpClientConnection.java

+8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public interface HttpClientConnection extends HttpConnection {
4343
*/
4444
HttpClientConnection evictionHandler(Handler<Void> handler);
4545

46+
/**
47+
* Set a {@code handler} called when the connection receives invalid messages.
48+
*
49+
* @param handler the handler
50+
* @return a reference to this, so the API can be used fluently
51+
*/
52+
HttpClientConnection invalidMessageHandler(Handler<Object> handler);
53+
4654
/**
4755
* Set a {@code handler} called when the connection concurrency changes.
4856
* The handler is called with the new concurrency.

src/test/java/io/vertx/core/http/Http1xTest.java

+35
Original file line numberDiff line numberDiff line change
@@ -5485,4 +5485,39 @@ public void testFailPendingRequestAllocationWhenConnectionIsClosed() throws Exce
54855485
}
54865486
await();
54875487
}
5488+
5489+
@Test
5490+
public void testUnsolicitedMessagesAreTreatedAsInvalid() throws Exception {
5491+
NetServer server = vertx
5492+
.createNetServer()
5493+
.connectHandler(so -> {
5494+
so.write("HTTP/1.1 200 OK\r\n" +
5495+
"Content-Type: text/plain\r\n" +
5496+
"Content-Length: 11\r\n" +
5497+
"\r\n" +
5498+
"Hello World");
5499+
});
5500+
server
5501+
.listen(testAddress)
5502+
.toCompletionStage()
5503+
.toCompletableFuture()
5504+
.get(20, TimeUnit.SECONDS);
5505+
5506+
HttpClient client = vertx.httpClientBuilder().withConnectHandler(conn -> {
5507+
List<Object> invalidMessages = new ArrayList<>();
5508+
((HttpClientConnection) conn).invalidMessageHandler(invalidMessages::add);
5509+
conn.exceptionHandler(err -> {
5510+
});
5511+
conn.closeHandler(v -> {
5512+
assertEquals(2, invalidMessages.size());
5513+
assertTrue(invalidMessages.get(0) instanceof io.netty.handler.codec.http.HttpResponse);
5514+
assertTrue(invalidMessages.get(1) instanceof io.netty.handler.codec.http.LastHttpContent);
5515+
testComplete();
5516+
});
5517+
}).build();
5518+
5519+
client.request(requestOptions);
5520+
5521+
await();
5522+
}
54885523
}

0 commit comments

Comments
 (0)