Skip to content

Commit 5f6c43a

Browse files
committed
Web socket support.
1 parent a99235f commit 5f6c43a

File tree

7 files changed

+352
-6
lines changed

7 files changed

+352
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.kubernetes.client.examples;
2+
3+
import io.kubernetes.client.ApiClient;
4+
import io.kubernetes.client.ApiException;
5+
import io.kubernetes.client.util.Config;
6+
import io.kubernetes.client.util.WebSockets;
7+
8+
import java.io.BufferedReader;
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.Reader;
12+
13+
public class WebSocketsExample {
14+
public static void main(String... args) throws ApiException, IOException {
15+
final ApiClient client = Config.defaultClient();
16+
WebSockets.stream(args[0], "POST", client, new WebSockets.SocketListener() {
17+
public void open() {}
18+
public void close() {
19+
// Trigger shutdown of the dispatcher's executor so this process can exit cleanly.
20+
client.getHttpClient().getDispatcher().getExecutorService().shutdown();
21+
}
22+
public void bytesMessage(InputStream is) {}
23+
public void textMessage(Reader in) {
24+
try {
25+
BufferedReader reader = new BufferedReader(in);
26+
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
27+
System.out.println(line);
28+
}
29+
} catch (IOException ex) {
30+
ex.printStackTrace();
31+
}
32+
}
33+
});
34+
}
35+
}

kubernetes/.swagger-codegen-ignore

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
.gitignore
22
git_push.sh
33
# Remove this once swagger-codegen 2.2.3 is released and we update.
4-
# We want https://github.com/swagger-api/swagger-codegen/pull/5629
5-
# in the release.
4+
# Verify the following PRs are in the release:
5+
# * https://github.com/swagger-api/swagger-codegen/pull/5629
6+
# * https://github.com/swagger-api/swagger-codegen/pull/5648
67
src/main/java/io/kubernetes/client/ApiClient.java
78

kubernetes/src/main/java/io/kubernetes/client/ApiClient.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -1069,9 +1069,16 @@ public <T> T handleResponse(Response response, Type returnType) throws ApiExcept
10691069
* @throws ApiException If fail to serialize the request body object
10701070
*/
10711071
public Call buildCall(String path, String method, List<Pair> queryParams, Object body, Map<String, String> headerParams, Map<String, Object> formParams, String[] authNames, ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
1072+
Request request = buildRequest(path, method, queryParams, body, headerParams, formParams, authNames, progressRequestListener);
1073+
1074+
return httpClient.newCall(request);
1075+
}
1076+
1077+
public Request buildRequest(String path, String method, List<Pair> queryParams, Object body, Map<String, String> headerParams, Map<String, Object> formParams, String[] authNames, ProgressRequestBody.ProgressRequestListener progressRequestListener) throws ApiException {
10721078
updateParamsForAuth(authNames, queryParams, headerParams);
10731079

10741080
final String url = buildUrl(path, queryParams);
1081+
System.out.println(url);
10751082
final Request.Builder reqBuilder = new Request.Builder().url(url);
10761083
processHeaderParams(headerParams, reqBuilder);
10771084

@@ -1108,8 +1115,7 @@ public Call buildCall(String path, String method, List<Pair> queryParams, Object
11081115
} else {
11091116
request = reqBuilder.method(method, reqBody).build();
11101117
}
1111-
1112-
return httpClient.newCall(request);
1118+
return request;
11131119
}
11141120

11151121
/**

util/pom.xml

+10-1
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,18 @@
2424
<artifactId>commons-codec</artifactId>
2525
<version>1.10</version>
2626
</dependency>
27+
<dependency>
28+
<groupId>com.squareup.okhttp</groupId>
29+
<artifactId>okhttp-ws</artifactId>
30+
<version>2.7.5</version>
31+
</dependency>
2732
<!-- test dependencies -->
2833
<dependency>
2934
<groupId>junit</groupId>
3035
<artifactId>junit</artifactId>
3136
<version>4.12</version>
3237
<scope>test</scope>
3338
</dependency>
34-
<!-- https://mvnrepository.com/artifact/com.github.stefanbirkner/system-rules -->
3539
<dependency>
3640
<groupId>com.github.stefanbirkner</groupId>
3741
<artifactId>system-rules</artifactId>
@@ -51,4 +55,9 @@
5155
</plugin>
5256
</plugins>
5357
</build>
58+
<properties>
59+
<java.version>1.7</java.version>
60+
<maven.compiler.source>${java.version}</maven.compiler.source>
61+
<maven.compiler.target>${java.version}</maven.compiler.target>
62+
</properties>
5463
</project>

util/src/main/java/io/kubernetes/client/util/Config.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,4 +170,4 @@ public static ApiClient defaultClient() throws IOException {
170170
client.setBasePath("http://localhost:8080");
171171
return client;
172172
}
173-
}
173+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright (C) 2014 Square, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
// This has been cloned from the okhttp-ws package so that I could remove
17+
// the requirement that websockets use the "GET" method
18+
package io.kubernetes.client.util;
19+
20+
import com.squareup.okhttp.Call;
21+
import com.squareup.okhttp.Callback;
22+
import com.squareup.okhttp.OkHttpClient;
23+
import com.squareup.okhttp.Request;
24+
import com.squareup.okhttp.Response;
25+
import com.squareup.okhttp.internal.Internal;
26+
import com.squareup.okhttp.internal.Util;
27+
import com.squareup.okhttp.internal.http.StreamAllocation;
28+
import com.squareup.okhttp.internal.ws.RealWebSocket;
29+
import com.squareup.okhttp.internal.ws.WebSocketProtocol;
30+
import com.squareup.okhttp.ws.WebSocketListener;
31+
import java.io.IOException;
32+
import java.net.ProtocolException;
33+
import java.security.SecureRandom;
34+
import java.util.Collections;
35+
import java.util.Random;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.LinkedBlockingDeque;
38+
import java.util.concurrent.ThreadPoolExecutor;
39+
import okio.ByteString;
40+
41+
import static java.util.concurrent.TimeUnit.SECONDS;
42+
43+
public final class WebSocketCall {
44+
/**
45+
* Prepares the {@code request} to create a web socket at some point in the future.
46+
*/
47+
public static WebSocketCall create(OkHttpClient client, Request request) {
48+
return new WebSocketCall(client, request);
49+
}
50+
51+
private final Call call;
52+
private final Random random;
53+
private final String key;
54+
55+
WebSocketCall(OkHttpClient client, Request request) {
56+
this(client, request, new SecureRandom());
57+
}
58+
59+
WebSocketCall(OkHttpClient client, Request request, Random random) {
60+
this.random = random;
61+
62+
byte[] nonce = new byte[16];
63+
random.nextBytes(nonce);
64+
key = ByteString.of(nonce).base64();
65+
66+
// Copy the client. Otherwise changes (socket factory, redirect policy,
67+
// etc.) may incorrectly be reflected in the request when it is executed.
68+
client = client.clone();
69+
// Force HTTP/1.1 until the WebSocket over HTTP/2 version is finalized.
70+
client.setProtocols(Collections.singletonList(com.squareup.okhttp.Protocol.HTTP_1_1));
71+
72+
request = request.newBuilder()
73+
.header("Upgrade", "websocket")
74+
.header("Connection", "Upgrade")
75+
.header("Sec-WebSocket-Key", key)
76+
.header("Sec-WebSocket-Version", "13")
77+
.build();
78+
79+
call = client.newCall(request);
80+
}
81+
82+
/**
83+
* Schedules the request to be executed at some point in the future.
84+
*
85+
* <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the request will run:
86+
* usually immediately unless there are several other requests currently being executed.
87+
*
88+
* <p>This client will later call back {@code responseCallback} with either an HTTP response or a
89+
* failure exception. If you {@link #cancel} a request before it completes the callback will not
90+
* be invoked.
91+
*
92+
* @throws IllegalStateException when the call has already been executed.
93+
*/
94+
public void enqueue(final WebSocketListener listener) {
95+
Callback responseCallback = new Callback() {
96+
@Override public void onResponse(Response response) throws IOException {
97+
System.out.println(response);
98+
try {
99+
createWebSocket(response, listener);
100+
} catch (IOException e) {
101+
listener.onFailure(e, response);
102+
}
103+
}
104+
105+
@Override public void onFailure(Request request, IOException e) {
106+
listener.onFailure(e, null);
107+
}
108+
};
109+
// TODO call.enqueue(responseCallback, true);
110+
Internal.instance.callEnqueue(call, responseCallback, true);
111+
}
112+
113+
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
114+
public void cancel() {
115+
call.cancel();
116+
}
117+
118+
private void createWebSocket(Response response, WebSocketListener listener) throws IOException {
119+
if (response.code() != 101) {
120+
Util.closeQuietly(response.body());
121+
throw new ProtocolException("Expected HTTP 101 response but was '"
122+
+ response.code()
123+
+ " "
124+
+ response.message()
125+
+ "'");
126+
}
127+
128+
String headerConnection = response.header("Connection");
129+
if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
130+
throw new ProtocolException(
131+
"Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'");
132+
}
133+
String headerUpgrade = response.header("Upgrade");
134+
if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
135+
throw new ProtocolException(
136+
"Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
137+
}
138+
String headerAccept = response.header("Sec-WebSocket-Accept");
139+
String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC);
140+
if (!acceptExpected.equals(headerAccept)) {
141+
throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
142+
+ acceptExpected
143+
+ "' but was '"
144+
+ headerAccept
145+
+ "'");
146+
}
147+
148+
StreamAllocation streamAllocation = Internal.instance.callEngineGetStreamAllocation(call);
149+
RealWebSocket webSocket = StreamWebSocket.create(
150+
streamAllocation, response, random, listener);
151+
152+
listener.onOpen(webSocket, response);
153+
154+
while (webSocket.readMessage()) {
155+
}
156+
}
157+
158+
// Keep static so that the WebSocketCall instance can be garbage collected.
159+
private static class StreamWebSocket extends RealWebSocket {
160+
static RealWebSocket create(StreamAllocation streamAllocation, Response response,
161+
Random random, WebSocketListener listener) {
162+
String url = response.request().urlString();
163+
ThreadPoolExecutor replyExecutor =
164+
new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(),
165+
Util.threadFactory(String.format("OkHttp %s WebSocket", url), true));
166+
replyExecutor.allowCoreThreadTimeOut(true);
167+
168+
return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, url);
169+
}
170+
171+
private final StreamAllocation streamAllocation;
172+
private final ExecutorService replyExecutor;
173+
174+
private StreamWebSocket(StreamAllocation streamAllocation,
175+
Random random, ExecutorService replyExecutor, WebSocketListener listener, String url) {
176+
super(true /* is client */, streamAllocation.connection().source,
177+
streamAllocation.connection().sink, random, replyExecutor, listener, url);
178+
this.streamAllocation = streamAllocation;
179+
this.replyExecutor = replyExecutor;
180+
}
181+
182+
@Override protected void close() throws IOException {
183+
replyExecutor.shutdown();
184+
streamAllocation.noNewStreams();
185+
streamAllocation.streamFinished(streamAllocation.stream());
186+
}
187+
}
188+
}

0 commit comments

Comments
 (0)