-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathWebSocketHandler.java
executable file
·173 lines (150 loc) · 7.79 KB
/
WebSocketHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
* Copyright (c) 2014-2024 AsyncHttpClient Project. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.asynchttpclient.netty.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.asynchttpclient.AsyncHandler.State;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.NettyResponseStatus;
import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.netty.ws.NettyWebSocket;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import java.io.IOException;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.SEC_WEBSOCKET_ACCEPT;
import static io.netty.handler.codec.http.HttpHeaderNames.SEC_WEBSOCKET_KEY;
import static io.netty.handler.codec.http.HttpHeaderNames.UPGRADE;
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static org.asynchttpclient.ws.WebSocketUtils.getAcceptKey;
@Sharable
public final class WebSocketHandler extends AsyncHttpClientHandler {
public WebSocketHandler(AsyncHttpClientConfig config, ChannelManager channelManager, NettyRequestSender requestSender) {
super(config, channelManager, requestSender);
}
private static WebSocketUpgradeHandler getWebSocketUpgradeHandler(NettyResponseFuture<?> future) {
return (WebSocketUpgradeHandler) future.getAsyncHandler();
}
private static NettyWebSocket getNettyWebSocket(NettyResponseFuture<?> future) throws Exception {
return getWebSocketUpgradeHandler(future).onCompleted();
}
private void upgrade(Channel channel, NettyResponseFuture<?> future, WebSocketUpgradeHandler handler, HttpResponse response, HttpHeaders responseHeaders) throws Exception {
boolean validStatus = response.status().equals(SWITCHING_PROTOCOLS);
boolean validUpgrade = response.headers().get(UPGRADE) != null;
String connection = response.headers().get(CONNECTION);
boolean validConnection = HttpHeaderValues.UPGRADE.contentEqualsIgnoreCase(connection);
final boolean headerOK = handler.onHeadersReceived(responseHeaders) == State.CONTINUE;
if (!headerOK || !validStatus || !validUpgrade || !validConnection) {
requestSender.abort(channel, future, new IOException("Invalid handshake response"));
return;
}
String accept = response.headers().get(SEC_WEBSOCKET_ACCEPT);
String key = getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(SEC_WEBSOCKET_KEY));
if (accept == null || !accept.equals(key)) {
requestSender.abort(channel, future, new IOException("Invalid challenge. Actual: " + accept + ". Expected: " + key));
}
// set back the future so the protocol gets notified of frames
// removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
// if it comes in the same frame as the HTTP Upgrade response
Channels.setAttribute(channel, future);
final NettyWebSocket webSocket = new NettyWebSocket(channel, responseHeaders);
handler.setWebSocket(webSocket);
channelManager.upgradePipelineForWebSockets(channel.pipeline());
// We don't need to synchronize as replacing the "ws-decoder" will
// process using the same thread.
try {
handler.onOpen(webSocket);
} catch (Exception ex) {
logger.warn("onSuccess unexpected exception", ex);
}
future.done();
}
private void abort(Channel channel, NettyResponseFuture<?> future, WebSocketUpgradeHandler handler, HttpResponseStatus status) {
try {
handler.onThrowable(new IOException("Invalid Status code=" + status.getStatusCode() + " text=" + status.getStatusText()));
} finally {
finishUpdate(future, channel, true);
}
}
@Override
public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e) throws Exception {
if (e instanceof HttpResponse) {
HttpResponse response = (HttpResponse) e;
if (logger.isDebugEnabled()) {
HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
logger.debug("\n\nRequest {}\n\nResponse {}\n", httpRequest, response);
}
WebSocketUpgradeHandler handler = getWebSocketUpgradeHandler(future);
HttpResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel);
HttpHeaders responseHeaders = response.headers();
if (!interceptors.exitAfterIntercept(channel, future, handler, response, status, responseHeaders)) {
if (handler.onStatusReceived(status) == State.CONTINUE) {
upgrade(channel, future, handler, response, responseHeaders);
} else {
abort(channel, future, handler, status);
}
}
} else if (e instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) e;
NettyWebSocket webSocket = getNettyWebSocket(future);
// retain because we might buffer the frame
if (webSocket.isReady()) {
webSocket.handleFrame(frame);
} else {
// WebSocket hasn't been opened yet, but upgrading the pipeline triggered a read and a frame was sent along the HTTP upgrade response
// as we want to keep sequential order (but can't notify user of open before upgrading, so he doesn't try to send immediately), we have to buffer
webSocket.bufferFrame(frame);
}
} else if (!(e instanceof LastHttpContent)) {
// ignore, end of handshake response
logger.error("Invalid message {}", e);
}
}
@Override
public void handleException(NettyResponseFuture<?> future, Throwable e) {
logger.warn("onError", e);
try {
NettyWebSocket webSocket = getNettyWebSocket(future);
if (webSocket != null) {
webSocket.onError(e);
webSocket.sendCloseFrame();
}
} catch (Throwable t) {
logger.error("onError", t);
}
}
@Override
public void handleChannelInactive(NettyResponseFuture<?> future) {
logger.trace("Connection was closed abnormally (that is, with no close frame being received).");
try {
NettyWebSocket webSocket = getNettyWebSocket(future);
if (webSocket != null) {
webSocket.onClose(1006, "Connection was closed abnormally (that is, with no close frame being received).");
}
} catch (Throwable t) {
logger.error("onError", t);
}
}
}