Skip to content

Commit 0f51dd6

Browse files
committed
SQL: Improve serialization of SQL processors (#45678)
Encapsulate the serialization/deserialization of SQL client classes. Make configuration specific parameters (such as ZoneId) generic just like the version and remove the need for consumer classes to manage them individually. This is not only consistent but also provides significant savings in the cursor. Fix #40216 (cherry picked from commit 5c84479)
1 parent c6b30b8 commit 0f51dd6

32 files changed

+341
-128
lines changed

x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/AbstractSqlQueryRequest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public AbstractSqlQueryRequest(StreamInput in) throws IOException {
194194
super(in);
195195
query = in.readString();
196196
params = in.readList(AbstractSqlQueryRequest::readSqlTypedParamValue);
197-
zoneId = ZoneId.of(in.readString());
197+
zoneId = in.readZoneId();
198198
fetchSize = in.readVInt();
199199
requestTimeout = in.readTimeValue();
200200
pageTimeout = in.readTimeValue();
@@ -218,7 +218,7 @@ public void writeTo(StreamOutput out) throws IOException {
218218
for (SqlTypedParamValue param: params) {
219219
writeSqlTypedParamValue(out, param);
220220
}
221-
out.writeString(zoneId.getId());
221+
out.writeZoneId(zoneId);
222222
out.writeVInt(fetchSize);
223223
out.writeTimeValue(requestTimeout);
224224
out.writeTimeValue(pageTimeout);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.sql.common.io;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
11+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
14+
15+
import java.io.IOException;
16+
import java.time.ZoneId;
17+
import java.util.Base64;
18+
19+
/**
20+
* SQL-specific stream extension for {@link StreamInput} used for deserializing
21+
* SQL components, especially on the client-side.
22+
*/
23+
public class SqlStreamInput extends NamedWriteableAwareStreamInput {
24+
25+
private final ZoneId zoneId;
26+
27+
public SqlStreamInput(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
28+
this(Base64.getDecoder().decode(base64encoded), namedWriteableRegistry, version);
29+
}
30+
31+
public SqlStreamInput(byte[] input, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
32+
super(StreamInput.wrap(input), namedWriteableRegistry);
33+
34+
// version check first
35+
Version ver = Version.readVersion(delegate);
36+
if (version.compareTo(ver) != 0) {
37+
throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", ver, version);
38+
}
39+
delegate.setVersion(version);
40+
// configuration settings
41+
zoneId = delegate.readZoneId();
42+
}
43+
44+
public ZoneId zoneId() {
45+
return zoneId;
46+
}
47+
48+
public static SqlStreamInput asSqlStream(StreamInput in) {
49+
if (in instanceof SqlStreamInput) {
50+
return (SqlStreamInput) in;
51+
}
52+
throw new SqlIllegalArgumentException("Expected SQL cursor stream, received [{}]", in.getClass());
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.sql.common.io;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
11+
12+
import java.io.ByteArrayOutputStream;
13+
import java.io.IOException;
14+
import java.nio.charset.StandardCharsets;
15+
import java.time.ZoneId;
16+
import java.util.Base64;
17+
18+
public class SqlStreamOutput extends OutputStreamStreamOutput {
19+
20+
private final ByteArrayOutputStream bytes;
21+
22+
public SqlStreamOutput(Version version, ZoneId zoneId) throws IOException {
23+
this(new ByteArrayOutputStream(), version, zoneId);
24+
}
25+
26+
private SqlStreamOutput(ByteArrayOutputStream bytes, Version version, ZoneId zoneId) throws IOException {
27+
super(Base64.getEncoder().wrap(new OutputStreamStreamOutput(bytes)));
28+
this.bytes = bytes;
29+
30+
Version.writeVersion(version, this);
31+
writeZoneId(zoneId);
32+
}
33+
34+
/**
35+
* Should be called _after_ closing the stream - there are no guarantees otherwise.
36+
*/
37+
public String streamAsString() {
38+
// Base64 uses this encoding instead of UTF-8
39+
return new String(bytes.toByteArray(), StandardCharsets.ISO_8859_1);
40+
}
41+
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/CompositeKeyExtractor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.common.io.stream.StreamOutput;
1010
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
1111
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
12+
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
1213
import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
1314
import org.elasticsearch.xpack.sql.util.DateUtils;
1415

@@ -42,15 +43,15 @@ public CompositeKeyExtractor(String key, Property property, ZoneId zoneId, boole
4243
CompositeKeyExtractor(StreamInput in) throws IOException {
4344
key = in.readString();
4445
property = in.readEnum(Property.class);
45-
zoneId = ZoneId.of(in.readString());
4646
isDateTimeBased = in.readBoolean();
47+
48+
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
4749
}
4850

4951
@Override
5052
public void writeTo(StreamOutput out) throws IOException {
5153
out.writeString(key);
5254
out.writeEnum(property);
53-
out.writeString(zoneId.getId());
5455
out.writeBoolean(isDateTimeBased);
5556
}
5657

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/FieldHitExtractor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
1818
import org.elasticsearch.search.SearchHit;
1919
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
20+
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
2021
import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
2122
import org.elasticsearch.xpack.sql.type.DataType;
2223
import org.elasticsearch.xpack.sql.util.DateUtils;
@@ -95,11 +96,12 @@ public FieldHitExtractor(String name, String fullFieldName, DataType dataType, Z
9596
}
9697
String esType = in.readOptionalString();
9798
dataType = esType != null ? DataType.fromTypeName(esType) : null;
98-
zoneId = ZoneId.of(in.readString());
9999
useDocValue = in.readBoolean();
100100
hitName = in.readOptionalString();
101101
arrayLeniency = in.readBoolean();
102102
path = sourcePath(fieldName, useDocValue, hitName);
103+
104+
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
103105
}
104106

105107
@Override
@@ -114,7 +116,6 @@ public void writeTo(StreamOutput out) throws IOException {
114116
out.writeOptionalString(fullFieldName);
115117
}
116118
out.writeOptionalString(dataType == null ? null : dataType.typeName);
117-
out.writeString(zoneId.getId());
118119
out.writeBoolean(useDocValue);
119120
out.writeOptionalString(hitName);
120121
out.writeBoolean(arrayLeniency);

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/MetricAggExtractor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
2222
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
2323
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
24+
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
2425
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
2526
import org.elasticsearch.xpack.sql.util.DateUtils;
2627

@@ -55,7 +56,8 @@ public MetricAggExtractor(String name, String property, String innerKey, ZoneId
5556
property = in.readString();
5657
innerKey = in.readOptionalString();
5758
isDateTimeBased = in.readBoolean();
58-
zoneId = ZoneId.of(in.readString());
59+
60+
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
5961
}
6062

6163
@Override
@@ -64,7 +66,6 @@ public void writeTo(StreamOutput out) throws IOException {
6466
out.writeString(property);
6567
out.writeOptionalString(innerKey);
6668
out.writeBoolean(isDateTimeBased);
67-
out.writeString(zoneId.getId());
6869
}
6970

7071
String name() {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
1111
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
1212
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
13+
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
1314
import org.elasticsearch.xpack.sql.type.DataType;
1415
import org.elasticsearch.xpack.sql.util.DateUtils;
1516

@@ -34,14 +35,13 @@ public TopHitsAggExtractor(String name, DataType fieldDataType, ZoneId zoneId) {
3435
TopHitsAggExtractor(StreamInput in) throws IOException {
3536
name = in.readString();
3637
fieldDataType = in.readEnum(DataType.class);
37-
zoneId = ZoneId.of(in.readString());
38+
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
3839
}
3940

4041
@Override
4142
public void writeTo(StreamOutput out) throws IOException {
4243
out.writeString(name);
4344
out.writeEnum(fieldDataType);
44-
out.writeString(zoneId.getId());
4545
}
4646

4747
String name() {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/BaseDateTimeProcessor.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
88

99
import org.elasticsearch.common.io.stream.StreamInput;
10-
import org.elasticsearch.common.io.stream.StreamOutput;
1110
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
11+
import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
1212
import org.elasticsearch.xpack.sql.expression.gen.processor.Processor;
1313

1414
import java.io.IOException;
@@ -24,12 +24,7 @@ public abstract class BaseDateTimeProcessor implements Processor {
2424
}
2525

2626
BaseDateTimeProcessor(StreamInput in) throws IOException {
27-
zoneId = ZoneId.of(in.readString());
28-
}
29-
30-
@Override
31-
public void writeTo(StreamOutput out) throws IOException {
32-
out.writeString(zoneId.getId());
27+
zoneId = SqlStreamInput.asSqlStream(in).zoneId();
3328
}
3429

3530
ZoneId zoneId() {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/DateTimeProcessor.java

-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public DateTimeProcessor(StreamInput in) throws IOException {
6363

6464
@Override
6565
public void writeTo(StreamOutput out) throws IOException {
66-
super.writeTo(out);
6766
out.writeEnum(extractor);
6867
}
6968

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NamedDateTimeProcessor.java

-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public NamedDateTimeProcessor(StreamInput in) throws IOException {
5757

5858
@Override
5959
public void writeTo(StreamOutput out) throws IOException {
60-
super.writeTo(out);
6160
out.writeEnum(extractor);
6261
}
6362

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NonIsoDateTimeProcessor.java

-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public NonIsoDateTimeProcessor(StreamInput in) throws IOException {
7272

7373
@Override
7474
public void writeTo(StreamOutput out) throws IOException {
75-
super.writeTo(out);
7675
out.writeEnum(extractor);
7776
}
7877

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/QuarterProcessor.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
88

99
import org.elasticsearch.common.io.stream.StreamInput;
10+
import org.elasticsearch.common.io.stream.StreamOutput;
1011

1112
import java.io.IOException;
1213
import java.time.ZoneId;
@@ -16,6 +17,10 @@
1617
import java.util.Objects;
1718

1819
public class QuarterProcessor extends BaseDateTimeProcessor {
20+
21+
public static final String NAME = "q";
22+
private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
23+
1924

2025
public QuarterProcessor(ZoneId zoneId) {
2126
super(zoneId);
@@ -25,8 +30,8 @@ public QuarterProcessor(StreamInput in) throws IOException {
2530
super(in);
2631
}
2732

28-
public static final String NAME = "q";
29-
private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
33+
@Override
34+
public void writeTo(StreamOutput out) throws IOException {}
3035

3136
@Override
3237
public String getWriteableName() {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public RestResponse buildResponse(SqlQueryResponse response) throws Exception {
110110
final String data = textFormat.format(request, response);
111111

112112
restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
113-
data.getBytes(StandardCharsets.UTF_8));
113+
data.getBytes(StandardCharsets.UTF_8));
114114

115115
if (response.hasCursor()) {
116116
restResponse.addHeader("Cursor", response.cursor());

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.sql.plugin;
77

8-
import org.elasticsearch.Version;
98
import org.elasticsearch.common.Strings;
9+
import org.elasticsearch.common.collect.Tuple;
1010
import org.elasticsearch.rest.RestRequest;
1111
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
1212
import org.elasticsearch.xpack.sql.action.BasicFormatter;
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.xpack.sql.util.DateUtils;
1818
import org.elasticsearch.xpack.sql.util.StringUtils;
1919

20+
import java.time.ZoneId;
2021
import java.time.ZonedDateTime;
2122
import java.util.List;
2223
import java.util.Locale;
@@ -43,10 +44,13 @@ enum TextFormat {
4344
String format(RestRequest request, SqlQueryResponse response) {
4445
BasicFormatter formatter = null;
4546
Cursor cursor = null;
47+
ZoneId zoneId = null;
4648

4749
// check if the cursor is already wrapped first
4850
if (response.hasCursor()) {
49-
cursor = Cursors.decodeFromString(response.cursor());
51+
Tuple<Cursor, ZoneId> tuple = Cursors.decodeFromStringWithZone(response.cursor());
52+
cursor = tuple.v1();
53+
zoneId = tuple.v2();
5054
if (cursor instanceof TextFormatterCursor) {
5155
formatter = ((TextFormatterCursor) cursor).getFormatter();
5256
}
@@ -58,7 +62,7 @@ String format(RestRequest request, SqlQueryResponse response) {
5862
formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
5963
// if there's a cursor, wrap the formatter in it
6064
if (cursor != null) {
61-
response.cursor(Cursors.encodeToString(Version.CURRENT, new TextFormatterCursor(cursor, formatter)));
65+
response.cursor(Cursors.encodeToString(new TextFormatterCursor(cursor, formatter), zoneId));
6266
}
6367
// format with header
6468
return formatter.formatWithHeader(response.columns(), response.rows());

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.sql.plugin;
77

8-
import org.elasticsearch.Version;
98
import org.elasticsearch.action.ActionListener;
109
import org.elasticsearch.action.support.ActionFilters;
1110
import org.elasticsearch.action.support.HandledTransportAction;
@@ -81,7 +80,7 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request,
8180
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
8281
} else {
8382
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
84-
wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)),
83+
wrap(p -> listener.onResponse(createResponse(request, null, p)),
8584
listener::onFailure));
8685
}
8786
}
@@ -102,10 +101,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p
102101
}
103102
}
104103
columns = unmodifiableList(columns);
105-
return createResponse(request.mode(), request.columnar(), columns, page);
104+
return createResponse(request, columns, page);
106105
}
107106

108-
static SqlQueryResponse createResponse(Mode mode, boolean columnar, List<ColumnInfo> header, Page page) {
107+
static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
109108
List<List<Object>> rows = new ArrayList<>();
110109
page.rowSet().forEachRow(rowView -> {
111110
List<Object> row = new ArrayList<>(rowView.columnCount());
@@ -114,9 +113,9 @@ static SqlQueryResponse createResponse(Mode mode, boolean columnar, List<ColumnI
114113
});
115114

116115
return new SqlQueryResponse(
117-
Cursors.encodeToString(Version.CURRENT, page.next()),
118-
mode,
119-
columnar,
116+
Cursors.encodeToString(page.next(), request.zoneId()),
117+
request.mode(),
118+
request.columnar(),
120119
header,
121120
rows);
122121
}

0 commit comments

Comments
 (0)