Skip to content

Commit 4d8a17b

Browse files
author
Lukas Wegmann
authored
[8.1] SQL: forward warning headers to JDBC driver (#84499) (#84999)
* SQL: forward warning headers to JDBC driver (#84499) Follow up on #83943 to ensure that users of the JDBC drivers also can act on the warnings. The change ensures that warnings in the response headers from the `_sql` API are available in `JdbcResultSet.getWarnings()`. * fix version
1 parent 42c8300 commit 4d8a17b

File tree

9 files changed

+202
-35
lines changed

9 files changed

+202
-35
lines changed

docs/changelog/84499.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 84499
2+
summary: Forward warning headers to JDBC driver
3+
area: SQL
4+
type: enhancement
5+
issues: []

x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/Cursor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ default int columnSize() {
2828
int batchSize();
2929

3030
void close() throws SQLException;
31+
32+
List<String> warnings();
33+
34+
void clearWarnings();
3135
}

x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/DefaultCursor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,24 @@ class DefaultCursor implements Cursor {
1818

1919
private final List<JdbcColumnInfo> columnInfos;
2020
private List<List<Object>> rows;
21+
private final List<String> warnings;
2122
private int row = -1;
2223
private String cursor;
2324

24-
DefaultCursor(JdbcHttpClient client, String cursor, List<JdbcColumnInfo> columnInfos, List<List<Object>> rows, RequestMeta meta) {
25+
DefaultCursor(
26+
JdbcHttpClient client,
27+
String cursor,
28+
List<JdbcColumnInfo> columnInfos,
29+
List<List<Object>> rows,
30+
RequestMeta meta,
31+
List<String> warnings
32+
) {
2533
this.client = client;
2634
this.meta = meta;
2735
this.cursor = cursor;
2836
this.columnInfos = columnInfos;
2937
this.rows = rows;
38+
this.warnings = warnings;
3039
}
3140

3241
@Override
@@ -67,4 +76,13 @@ public void close() throws SQLException {
6776
client.queryClose(cursor);
6877
}
6978
}
79+
80+
public List<String> warnings() {
81+
return warnings;
82+
}
83+
84+
@Override
85+
public void clearWarnings() {
86+
warnings.clear();
87+
}
7088
}

x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcDatabaseMetaData.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.sql.RowIdLifetime;
1919
import java.sql.SQLException;
2020
import java.util.ArrayList;
21+
import java.util.Collections;
2122
import java.util.List;
2223

2324
import static java.sql.JDBCType.BIGINT;
@@ -1420,5 +1421,13 @@ public int batchSize() {
14201421
public void close() throws SQLException {
14211422
// this cursor doesn't hold any resource - no need to clean up
14221423
}
1424+
1425+
@Override
1426+
public List<String> warnings() {
1427+
return Collections.emptyList();
1428+
}
1429+
1430+
@Override
1431+
public void clearWarnings() {}
14231432
}
14241433
}

x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.xpack.sql.client.ClientException;
1010
import org.elasticsearch.xpack.sql.client.ClientVersion;
1111
import org.elasticsearch.xpack.sql.client.HttpClient;
12+
import org.elasticsearch.xpack.sql.client.HttpClient.ResponseWithWarnings;
1213
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
1314
import org.elasticsearch.xpack.sql.proto.MainResponse;
1415
import org.elasticsearch.xpack.sql.proto.Mode;
@@ -75,8 +76,15 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
7576
conCfg.indexIncludeFrozen(),
7677
conCfg.binaryCommunication()
7778
);
78-
SqlQueryResponse response = httpClient.query(sqlRequest);
79-
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
79+
ResponseWithWarnings<SqlQueryResponse> response = httpClient.query(sqlRequest);
80+
return new DefaultCursor(
81+
this,
82+
response.response().cursor(),
83+
toJdbcColumnInfo(response.response().columns()),
84+
response.response().rows(),
85+
meta,
86+
response.warnings()
87+
);
8088
}
8189

8290
/**
@@ -91,7 +99,7 @@ Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) thro
9199
new RequestInfo(Mode.JDBC),
92100
conCfg.binaryCommunication()
93101
);
94-
SqlQueryResponse response = httpClient.query(sqlRequest);
102+
SqlQueryResponse response = httpClient.query(sqlRequest).response();
95103
return new Tuple<>(response.cursor(), response.rows());
96104
}
97105

x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcResultSet.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class JdbcResultSet implements ResultSet, JdbcWrapper {
5454
private final Calendar defaultCalendar;
5555

5656
private final JdbcStatement statement;
57-
private final Cursor cursor;
57+
private Cursor cursor;
5858
private final Map<String, Integer> nameToIndex = new LinkedHashMap<>();
5959

6060
private boolean closed = false;
@@ -585,12 +585,21 @@ public InputStream getBinaryStream(String columnLabel) throws SQLException {
585585
@Override
586586
public SQLWarning getWarnings() throws SQLException {
587587
checkOpen();
588-
return null;
588+
SQLWarning sqlWarning = null;
589+
for (String warning : cursor.warnings()) {
590+
if (sqlWarning == null) {
591+
sqlWarning = new SQLWarning(warning);
592+
} else {
593+
sqlWarning.setNextWarning(new SQLWarning(warning));
594+
}
595+
}
596+
return sqlWarning;
589597
}
590598

591599
@Override
592600
public void clearWarnings() throws SQLException {
593601
checkOpen();
602+
cursor.clearWarnings();
594603
}
595604

596605
@Override

x-pack/plugin/sql/qa/jdbc/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/JdbcWarningsTestCase.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,89 @@
77

88
package org.elasticsearch.xpack.sql.qa.jdbc;
99

10+
import org.elasticsearch.Version;
11+
import org.junit.Before;
12+
13+
import java.io.IOException;
1014
import java.sql.Connection;
1115
import java.sql.ResultSet;
16+
import java.sql.SQLException;
17+
import java.sql.SQLWarning;
1218
import java.sql.Statement;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import java.util.Properties;
22+
23+
import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_DRIVER_VERSION;
24+
import static org.hamcrest.Matchers.containsInAnyOrder;
25+
import static org.hamcrest.Matchers.containsString;
1326

1427
public abstract class JdbcWarningsTestCase extends JdbcIntegrationTestCase {
1528

16-
public void testDeprecationWarningsDoNotReachJdbcDriver() throws Exception {
29+
private static final Version WARNING_HANDLING_ADDED_VERSION = Version.V_8_1_1;
30+
31+
@Before
32+
public void setupData() throws IOException {
1733
index("test_data", b -> b.field("foo", 1));
34+
}
35+
36+
public void testNoWarnings() throws SQLException {
37+
try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
38+
ResultSet rs = statement.executeQuery("SELECT * FROM test_data");
39+
assertNull(rs.getWarnings());
40+
}
41+
}
42+
43+
public void testSingleDeprecationWarning() throws SQLException {
44+
assumeWarningHandlingDriverVersion();
45+
46+
try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
47+
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
48+
SQLWarning warning = rs.getWarnings();
49+
assertThat(warning.getMessage(), containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."));
50+
assertNull(warning.getNextWarning());
51+
}
52+
}
53+
54+
public void testMultipleDeprecationWarnings() throws SQLException {
55+
assumeWarningHandlingDriverVersion();
56+
57+
Properties props = connectionProperties();
58+
props.setProperty("index.include.frozen", "true");
59+
60+
try (Connection connection = esJdbc(props); Statement statement = connection.createStatement()) {
61+
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
62+
List<String> warnings = new LinkedList<>();
63+
SQLWarning warning = rs.getWarnings();
64+
while (warning != null) {
65+
warnings.add(warning.getMessage());
66+
warning = warning.getNextWarning();
67+
}
68+
69+
assertThat(
70+
warnings,
71+
containsInAnyOrder(
72+
containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."),
73+
containsString("[index_include_frozen] parameter is deprecated because frozen indices have been deprecated.")
74+
)
75+
);
76+
}
77+
}
78+
79+
public void testClearWarnings() throws SQLException {
80+
assumeWarningHandlingDriverVersion();
1881

1982
try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
20-
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN \"test_*\"");
83+
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
84+
assertNotNull(rs.getWarnings());
85+
86+
rs.clearWarnings();
2187
assertNull(rs.getWarnings());
2288
}
2389
}
2490

91+
private void assumeWarningHandlingDriverVersion() {
92+
assumeTrue("Driver does not yet handle deprecation warnings", JDBC_DRIVER_VERSION.onOrAfter(WARNING_HANDLING_ADDED_VERSION));
93+
}
94+
2595
}

x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.io.InputStream;
3434
import java.security.PrivilegedAction;
3535
import java.sql.SQLException;
36+
import java.util.Collections;
37+
import java.util.List;
3638
import java.util.function.Function;
3739

3840
import static java.util.Collections.emptyList;
@@ -44,6 +46,24 @@
4446
*/
4547
public class HttpClient {
4648

49+
public static class ResponseWithWarnings<R> {
50+
private final R response;
51+
private final List<String> warnings;
52+
53+
ResponseWithWarnings(R response, List<String> warnings) {
54+
this.response = response;
55+
this.warnings = warnings;
56+
}
57+
58+
public R response() {
59+
return response;
60+
}
61+
62+
public List<String> warnings() {
63+
return warnings;
64+
}
65+
}
66+
4767
private final ConnectionConfiguration cfg;
4868
private final ContentType requestBodyContentType;
4969

@@ -78,10 +98,10 @@ public SqlQueryResponse basicQuery(String query, int fetchSize) throws SQLExcept
7898
false,
7999
cfg.binaryCommunication()
80100
);
81-
return query(sqlRequest);
101+
return query(sqlRequest).response();
82102
}
83103

84-
public SqlQueryResponse query(SqlQueryRequest sqlRequest) throws SQLException {
104+
public ResponseWithWarnings<SqlQueryResponse> query(SqlQueryRequest sqlRequest) throws SQLException {
85105
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
86106
}
87107

@@ -94,28 +114,28 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
94114
new RequestInfo(Mode.CLI),
95115
cfg.binaryCommunication()
96116
);
97-
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
117+
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse).response();
98118
}
99119

100120
public boolean queryClose(String cursor, Mode mode) throws SQLException {
101-
SqlClearCursorResponse response = post(
121+
ResponseWithWarnings<SqlClearCursorResponse> response = post(
102122
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
103123
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
104124
Payloads::parseClearCursorResponse
105125
);
106-
return response.isSucceeded();
126+
return response.response().isSucceeded();
107127
}
108128

109129
@SuppressWarnings({ "removal" })
110-
private <Request extends AbstractSqlRequest, Response> Response post(
130+
private <Request extends AbstractSqlRequest, Response> ResponseWithWarnings<Response> post(
111131
String path,
112132
Request request,
113133
CheckedFunction<JsonParser, Response, IOException> responseParser
114134
) throws SQLException {
115135
byte[] requestBytes = toContent(request);
116136
String query = "error_trace";
117-
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
118-
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
137+
Tuple<Function<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
138+
(PrivilegedAction<ResponseOrException<Tuple<Function<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
119139
path,
120140
query,
121141
cfg,
@@ -127,7 +147,11 @@ private <Request extends AbstractSqlRequest, Response> Response post(
127147
)
128148
)
129149
).getResponseOrThrowException();
130-
return fromContent(response.v1(), response.v2(), responseParser);
150+
List<String> warnings = response.v1().apply("Warning");
151+
return new ResponseWithWarnings<>(
152+
fromContent(contentType(response.v1()), response.v2(), responseParser),
153+
warnings == null ? Collections.emptyList() : warnings
154+
);
131155
}
132156

133157
@SuppressWarnings({ "removal" })
@@ -158,15 +182,15 @@ private boolean head(String path, long timeoutInMs) throws SQLException {
158182

159183
@SuppressWarnings({ "removal" })
160184
private <Response> Response get(String path, CheckedFunction<JsonParser, Response, IOException> responseParser) throws SQLException {
161-
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
162-
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
185+
Tuple<Function<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
186+
(PrivilegedAction<ResponseOrException<Tuple<Function<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
163187
path,
164188
"error_trace",
165189
cfg,
166190
con -> con.request(null, this::readFrom, "GET")
167191
)
168192
).getResponseOrThrowException();
169-
return fromContent(response.v1(), response.v2(), responseParser);
193+
return fromContent(contentType(response.v1()), response.v2(), responseParser);
170194
}
171195

172196
private <Request extends AbstractSqlRequest> byte[] toContent(Request request) {
@@ -180,31 +204,35 @@ private <Request extends AbstractSqlRequest> byte[] toContent(Request request) {
180204
}
181205
}
182206

183-
private Tuple<ContentType, byte[]> readFrom(InputStream inputStream, Function<String, String> headers) {
184-
String contentType = headers.apply("Content-Type");
185-
ContentType type = ContentFactory.parseMediaType(contentType);
186-
if (type == null) {
187-
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
188-
}
207+
private Tuple<Function<String, List<String>>, byte[]> readFrom(InputStream inputStream, Function<String, List<String>> headers) {
189208
ByteArrayOutputStream out = new ByteArrayOutputStream();
190209
try {
191210
Streams.copy(inputStream, out);
192211
} catch (Exception ex) {
193212
throw new ClientException("Cannot deserialize response", ex);
194213
}
195-
return new Tuple<>(type, out.toByteArray());
214+
return new Tuple<>(headers, out.toByteArray());
215+
216+
}
196217

218+
private ContentType contentType(Function<String, List<String>> headers) {
219+
List<String> contentTypeHeaders = headers.apply("Content-Type");
220+
221+
String contentType = contentTypeHeaders == null || contentTypeHeaders.isEmpty() ? null : contentTypeHeaders.get(0);
222+
ContentType type = ContentFactory.parseMediaType(contentType);
223+
if (type == null) {
224+
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
225+
} else {
226+
return type;
227+
}
197228
}
198229

199230
private <Response> Response fromContent(
200-
ContentType contentType,
231+
ContentType type,
201232
byte[] bytesReference,
202233
CheckedFunction<JsonParser, Response, IOException> responseParser
203234
) {
204-
try (
205-
InputStream stream = new ByteArrayInputStream(bytesReference);
206-
JsonParser parser = ContentFactory.parser(contentType, stream)
207-
) {
235+
try (InputStream stream = new ByteArrayInputStream(bytesReference); JsonParser parser = ContentFactory.parser(type, stream)) {
208236
return responseParser.apply(parser);
209237
} catch (Exception ex) {
210238
throw new ClientException("Cannot parse response", ex);

0 commit comments

Comments
 (0)