Skip to content

Commit a375393

Browse files
authored
Relax compatibility constraints on sql cursor streams for all versions after 8.8.0 (#94597)
This is needed to separate TransportVersion from Version. No substantive changes are planned for SQL in the future, so it's ok to drop BwC checks for future versions for now. If BwC checks are ever needed again, they will need to be implemented in a way compatible with TransportVersion (see #94980)
1 parent 2fad04b commit a375393

File tree

10 files changed

+91
-63
lines changed

10 files changed

+91
-63
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,15 @@ public boolean onOrBefore(TransportVersion version) {
262262
return version.id >= id;
263263
}
264264

265+
public static TransportVersion fromString(String str) {
266+
return TransportVersion.fromId(Integer.parseInt(str));
267+
}
268+
265269
@Override
266270
public int compareTo(TransportVersion other) {
267271
return Integer.compare(this.id, other.id);
268272
}
269273

270-
public static TransportVersion fromString(String str) {
271-
return TransportVersion.fromId(Integer.parseInt(str));
272-
}
273-
274274
@Override
275275
public String toString() {
276276
return Integer.toString(id);

x-pack/plugin/ql/test-fixtures/src/main/java/org/elasticsearch/xpack/ql/TestNodes.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77

88
package org.elasticsearch.xpack.ql;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.Version;
1112

13+
import java.util.Comparator;
1214
import java.util.HashMap;
1315
import java.util.List;
16+
import java.util.Objects;
1417
import java.util.stream.Collectors;
1518

1619
public final class TestNodes extends HashMap<String, TestNode> {
@@ -33,7 +36,16 @@ public Version getBWCVersion() {
3336
if (isEmpty()) {
3437
throw new IllegalStateException("no nodes available");
3538
}
36-
return Version.fromId(values().stream().mapToInt(node -> node.version().id).min().getAsInt());
39+
return values().stream().map(TestNode::version).min(Comparator.naturalOrder()).get();
40+
}
41+
42+
public TransportVersion getBWCTransportVersion() {
43+
if (isEmpty()) {
44+
throw new IllegalStateException("no nodes available");
45+
}
46+
// there will be either at least one node with version <8.8.0, and so a mapped TransportVersion will be set,
47+
// or all >=8.8.0,so TransportVersion will always be there
48+
return values().stream().map(TestNode::transportVersion).filter(Objects::nonNull).min(Comparator.naturalOrder()).get();
3749
}
3850

3951
@Override

x-pack/plugin/sql/qa/mixed-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/mixed_node/SqlCompatIT.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
package org.elasticsearch.xpack.sql.qa.mixed_node;
99

1010
import org.apache.http.HttpHost;
11-
import org.elasticsearch.Version;
11+
import org.elasticsearch.TransportVersion;
1212
import org.elasticsearch.client.Request;
1313
import org.elasticsearch.client.Response;
1414
import org.elasticsearch.client.ResponseException;
@@ -39,15 +39,15 @@ public class SqlCompatIT extends BaseRestSqlTestCase {
3939

4040
private static RestClient newNodesClient;
4141
private static RestClient oldNodesClient;
42-
private static Version bwcVersion;
42+
private static TransportVersion bwcVersion;
4343

4444
@Before
4545
public void initBwcClients() throws IOException {
4646
if (newNodesClient == null) {
4747
assertNull(oldNodesClient);
4848

4949
TestNodes nodes = buildNodeAndVersions(client());
50-
bwcVersion = nodes.getBWCVersion();
50+
bwcVersion = nodes.getBWCTransportVersion();
5151
newNodesClient = buildClient(
5252
restClientSettings(),
5353
nodes.getNewNodes().stream().map(TestNode::publishAddress).toArray(HttpHost[]::new)
@@ -107,28 +107,19 @@ private List<Integer> runOrderByNullsLastQuery(RestClient queryClient) throws IO
107107
indexDocs();
108108

109109
Request query = new Request("POST", "_sql");
110-
query.setJsonEntity(sqlQueryEntityWithOptionalMode("SELECT int FROM test GROUP BY 1 ORDER BY 1 NULLS LAST", bwcVersion));
110+
query.setJsonEntity(sqlQueryEntityWithOptionalMode("SELECT int FROM test GROUP BY 1 ORDER BY 1 NULLS LAST"));
111111
Map<String, Object> result = performRequestAndReadBodyAsJson(queryClient, query);
112112

113113
List<List<Object>> rows = (List<List<Object>>) result.get("rows");
114114
return rows.stream().map(row -> (Integer) row.get(0)).collect(Collectors.toList());
115115
}
116116

117-
public static String sqlQueryEntityWithOptionalMode(String query, Version bwcVersion) throws IOException {
118-
return sqlQueryEntityWithOptionalMode(Map.of("query", query), bwcVersion);
117+
public static String sqlQueryEntityWithOptionalMode(String query) throws IOException {
118+
return sqlQueryEntityWithOptionalMode(Map.of("query", query));
119119
}
120120

121-
public static String sqlQueryEntityWithOptionalMode(Map<String, Object> fields, Version bwcVersion) throws IOException {
121+
public static String sqlQueryEntityWithOptionalMode(Map<String, Object> fields) throws IOException {
122122
XContentBuilder json = XContentFactory.jsonBuilder().startObject();
123-
if (bwcVersion.before(Version.V_7_12_0)) {
124-
// a bug previous to 7.12 caused a NullPointerException when accessing displaySize in ColumnInfo. The bug has been addressed in
125-
// https://github.com/elastic/elasticsearch/pull/68802/files
126-
// #diff-2faa4e2df98a4636300a19d9d890a1bd7174e9b20dd3a8589d2c78a3d9e5cbc0L110
127-
// as a workaround, use JDBC (driver) mode in versions prior to 7.12
128-
json.field("mode", "jdbc");
129-
json.field("binary_format", false);
130-
json.field("version", bwcVersion.toString());
131-
}
132123
for (Map.Entry<String, Object> entry : fields.entrySet()) {
133124
json.field(entry.getKey(), entry.getValue());
134125
}
@@ -137,21 +128,26 @@ public static String sqlQueryEntityWithOptionalMode(Map<String, Object> fields,
137128
return Strings.toString(json);
138129
}
139130

140-
public void testCursorFromOldNodeFailsOnNewNode() throws IOException {
141-
assertCursorNotCompatibleAcrossVersions(bwcVersion, oldNodesClient, Version.CURRENT, newNodesClient);
131+
public void testHistoricCursorFromOldNodeFailsOnNewNode() throws IOException {
132+
assumeTrue("BwC checks only enabled for <=8.7.0", bwcVersion.before(TransportVersion.V_8_8_0));
133+
assertCursorNotCompatibleAcrossVersions(bwcVersion, oldNodesClient, TransportVersion.CURRENT, newNodesClient);
142134
}
143135

144136
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/83726")
145137
public void testCursorFromNewNodeFailsOnOldNode() throws IOException {
146-
assertCursorNotCompatibleAcrossVersions(Version.CURRENT, newNodesClient, bwcVersion, oldNodesClient);
138+
assertCursorNotCompatibleAcrossVersions(TransportVersion.CURRENT, newNodesClient, bwcVersion, oldNodesClient);
147139
}
148140

149-
private void assertCursorNotCompatibleAcrossVersions(Version version1, RestClient client1, Version version2, RestClient client2)
150-
throws IOException {
141+
private void assertCursorNotCompatibleAcrossVersions(
142+
TransportVersion version1,
143+
RestClient client1,
144+
TransportVersion version2,
145+
RestClient client2
146+
) throws IOException {
151147
indexDocs();
152148

153149
Request req = new Request("POST", "_sql");
154-
req.setJsonEntity(sqlQueryEntityWithOptionalMode(Map.of("query", "SELECT int FROM test", "fetch_size", 1), bwcVersion));
150+
req.setJsonEntity(sqlQueryEntityWithOptionalMode(Map.of("query", "SELECT int FROM test", "fetch_size", 1)));
155151
Map<String, Object> json = performRequestAndReadBodyAsJson(client1, req);
156152
String cursor = (String) json.get("cursor");
157153
assertThat(cursor, Matchers.not(Matchers.emptyOrNullString()));

x-pack/plugin/sql/qa/mixed-node/src/javaRestTest/java/org/elasticsearch/xpack/sql/qa/mixed_node/SqlSearchIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void assertAllTypesWithNodes(Map<String, Object> expectedResponse, List<
247247
String query = "SELECT " + intervalYearMonth + intervalDayTime + fieldsList + " FROM " + index + " ORDER BY id";
248248

249249
Request request = new Request("POST", "_sql");
250-
request.setJsonEntity(SqlCompatIT.sqlQueryEntityWithOptionalMode(query, bwcVersion));
250+
request.setJsonEntity(SqlCompatIT.sqlQueryEntityWithOptionalMode(query));
251251
assertBusy(() -> { assertResponse(expectedResponse, dropDisplaySizes(runSql(client, request))); });
252252
}
253253
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamInput.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.sql.common.io;
99

1010
import org.elasticsearch.TransportVersion;
11-
import org.elasticsearch.Version;
1211
import org.elasticsearch.common.compress.CompressorFactory;
1312
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
1413
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
@@ -26,17 +25,27 @@
2625
*/
2726
public class SqlStreamInput extends NamedWriteableAwareStreamInput {
2827

29-
public static SqlStreamInput fromString(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version)
28+
public static SqlStreamInput fromString(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, TransportVersion version)
3029
throws IOException {
3130
byte[] bytes = Base64.getDecoder().decode(base64encoded);
3231
StreamInput in = StreamInput.wrap(bytes);
33-
Version inVersion = Version.readVersion(in);
34-
if (version.compareTo(inVersion) != 0) {
35-
throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", inVersion, version);
36-
}
37-
32+
TransportVersion inVersion = TransportVersion.readVersion(in);
33+
validateStreamVersion(version, inVersion);
3834
InputStreamStreamInput uncompressingIn = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(in));
39-
return new SqlStreamInput(uncompressingIn, namedWriteableRegistry, inVersion.transportVersion);
35+
return new SqlStreamInput(uncompressingIn, namedWriteableRegistry, inVersion);
36+
}
37+
38+
/**
39+
* Prior to 8.8.0, we only allow cursors to be deserialized with the same node version they were created.
40+
* <p>
41+
* In 8.8.0 and after, we are relaxing this constraint so we don't need to map between Version and TransportVersion.
42+
* If there is any future work that needs specific cursor compatibility checks, this needs to be implemented appropriately
43+
* using TransportVersion.
44+
*/
45+
private static void validateStreamVersion(TransportVersion version, TransportVersion cursorVersion) {
46+
if (cursorVersion.before(TransportVersion.V_8_8_0) && version.equals(cursorVersion) == false) {
47+
throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", cursorVersion, version);
48+
}
4049
}
4150

4251
private final ZoneId zoneId;

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamOutput.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.sql.common.io;
99

1010
import org.elasticsearch.TransportVersion;
11-
import org.elasticsearch.Version;
1211
import org.elasticsearch.common.compress.CompressorFactory;
1312
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -30,12 +29,12 @@ public class SqlStreamOutput extends OutputStreamStreamOutput {
3029

3130
private final ByteArrayOutputStream bytes;
3231

33-
public static SqlStreamOutput create(Version version, ZoneId zoneId) throws IOException {
32+
public static SqlStreamOutput create(TransportVersion version, ZoneId zoneId) throws IOException {
3433
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
3534
StreamOutput uncompressedOut = new OutputStreamStreamOutput(Base64.getEncoder().wrap(bytes));
36-
Version.writeVersion(version, uncompressedOut);
35+
TransportVersion.writeVersion(version, uncompressedOut);
3736
OutputStream out = CompressorFactory.COMPRESSOR.threadLocalOutputStream(uncompressedOut);
38-
return new SqlStreamOutput(bytes, out, version.transportVersion, zoneId);
37+
return new SqlStreamOutput(bytes, out, version, zoneId);
3938
}
4039

4140
private SqlStreamOutput(ByteArrayOutputStream bytes, OutputStream out, TransportVersion version, ZoneId zoneId) throws IOException {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.sql.session;
88

9-
import org.elasticsearch.Version;
9+
import org.elasticsearch.TransportVersion;
1010
import org.elasticsearch.common.Strings;
1111
import org.elasticsearch.common.io.stream.NamedWriteable;
1212
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -38,7 +38,7 @@
3838
public final class Cursors {
3939

4040
private static final NamedWriteableRegistry WRITEABLE_REGISTRY = new NamedWriteableRegistry(getNamedWriteables());
41-
private static final Version VERSION = Version.CURRENT;
41+
private static final TransportVersion VERSION = TransportVersion.CURRENT;
4242

4343
private Cursors() {}
4444

@@ -73,7 +73,7 @@ public static String encodeToString(Cursor info, ZoneId zoneId) {
7373
return encodeToString(info, VERSION, zoneId);
7474
}
7575

76-
public static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
76+
public static String encodeToString(Cursor info, TransportVersion version, ZoneId zoneId) {
7777
if (info == Cursor.EMPTY) {
7878
return StringUtils.EMPTY;
7979
}

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlWireSerializingTestCase.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.sql;
99

1010
import org.elasticsearch.TransportVersion;
11-
import org.elasticsearch.Version;
1211
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1312
import org.elasticsearch.common.io.stream.Writeable;
1413
import org.elasticsearch.test.AbstractWireTestCase;
@@ -23,13 +22,11 @@ public abstract class AbstractSqlWireSerializingTestCase<T extends Writeable> ex
2322

2423
@Override
2524
protected T copyInstance(T instance, TransportVersion version) throws IOException {
26-
Version nodeVersion = Version.fromId(version.id);
27-
2825
ZoneId zoneId = instanceZoneId(instance);
29-
SqlStreamOutput out = SqlStreamOutput.create(nodeVersion, zoneId);
26+
SqlStreamOutput out = SqlStreamOutput.create(version, zoneId);
3027
instance.writeTo(out);
3128
out.close();
32-
try (SqlStreamInput in = SqlStreamInput.fromString(out.streamAsString(), getNamedWriteableRegistry(), nodeVersion)) {
29+
try (SqlStreamInput in = SqlStreamInput.fromString(out.streamAsString(), getNamedWriteableRegistry(), version)) {
3330
return instanceReader().read(in);
3431
}
3532
}

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/common/io/SqlStreamTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
package org.elasticsearch.xpack.sql.common.io;
99

1010
import org.apache.lucene.util.BytesRef;
11-
import org.elasticsearch.Version;
11+
import org.elasticsearch.TransportVersion;
1212
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
1313
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1414
import org.elasticsearch.test.ESTestCase;
@@ -28,19 +28,19 @@ public class SqlStreamTests extends ESTestCase {
2828
public void testWriteAndRead() throws IOException {
2929
BytesRef payload = new BytesRef(randomByteArrayOfLength(randomIntBetween(10, 1000)));
3030

31-
SqlStreamOutput out = SqlStreamOutput.create(Version.CURRENT, randomZone());
31+
SqlStreamOutput out = SqlStreamOutput.create(TransportVersion.CURRENT, randomZone());
3232
out.writeBytesRef(payload);
3333
out.close();
3434
String encoded = out.streamAsString();
3535

36-
SqlStreamInput in = SqlStreamInput.fromString(encoded, new NamedWriteableRegistry(List.of()), Version.CURRENT);
36+
SqlStreamInput in = SqlStreamInput.fromString(encoded, new NamedWriteableRegistry(List.of()), TransportVersion.CURRENT);
3737
BytesRef read = in.readBytesRef();
3838

3939
assertArrayEquals(payload.bytes, read.bytes);
4040
}
4141

4242
public void testPayloadIsCompressed() throws IOException {
43-
SqlStreamOutput out = SqlStreamOutput.create(Version.CURRENT, randomZone());
43+
SqlStreamOutput out = SqlStreamOutput.create(TransportVersion.CURRENT, randomZone());
4444
byte[] payload = new byte[1000];
4545
Arrays.fill(payload, (byte) 0);
4646
out.write(payload);
@@ -60,15 +60,15 @@ public void testOldCursorProducesVersionMismatchError() {
6060
+ "AP////8PAAAAAAAAAAAAAAAAAVoDAAICAAAAAAAAAAAKAP////8PAgFtCDJkMTBjNGJhBXZhbHVlAAEE"
6161
+ "QllURQFrCGJkZWY4OGU1AAABAwA=",
6262
new NamedWriteableRegistry(List.of()),
63-
Version.CURRENT
63+
TransportVersion.CURRENT
6464
)
6565
);
6666

67-
assertThat(ex.getMessage(), containsString("Unsupported cursor version [7.15.1], expected [" + Version.CURRENT + "]"));
67+
assertThat(ex.getMessage(), containsString("Unsupported cursor version [7150199], expected [" + TransportVersion.CURRENT + "]"));
6868
}
6969

7070
public void testVersionCanBeReadByOldNodes() throws IOException {
71-
Version version = randomFrom(Version.V_7_0_0, Version.V_7_2_1, Version.V_8_1_0);
71+
TransportVersion version = randomFrom(TransportVersion.V_7_0_0, TransportVersion.V_7_2_1, TransportVersion.V_8_1_0);
7272
SqlStreamOutput out = SqlStreamOutput.create(version, randomZone());
7373
out.writeString("payload");
7474
out.close();
@@ -77,7 +77,7 @@ public void testVersionCanBeReadByOldNodes() throws IOException {
7777
byte[] bytes = Base64.getDecoder().decode(encoded);
7878
InputStreamStreamInput in = new InputStreamStreamInput(new ByteArrayInputStream(bytes));
7979

80-
assertEquals(version, Version.readVersion(in));
80+
assertEquals(version, TransportVersion.readVersion(in));
8181
}
8282

8383
}

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
*/
77
package org.elasticsearch.xpack.sql.plugin;
88

9-
import org.elasticsearch.Version;
9+
import org.elasticsearch.TransportVersion;
1010
import org.elasticsearch.action.support.PlainActionFuture;
1111
import org.elasticsearch.client.internal.Client;
1212
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1313
import org.elasticsearch.common.logging.LoggerMessageFormat;
1414
import org.elasticsearch.core.Tuple;
1515
import org.elasticsearch.test.ESTestCase;
16-
import org.elasticsearch.test.VersionUtils;
16+
import org.elasticsearch.test.TransportVersionUtils;
1717
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
1818
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
1919
import org.elasticsearch.xpack.sql.proto.StringUtils;
@@ -43,20 +43,30 @@ public void testEmptyCursorClearCursor() {
4343
verifyNoMoreInteractions(clientMock);
4444
}
4545

46-
public void testVersionHandling() {
46+
public void testHistoricVersionHandling() {
4747
Cursor cursor = randomSearchHitCursor();
4848
assertEquals(cursor, decodeFromString(encodeToString(cursor, randomZone())));
4949

50-
Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
50+
// encoded with a different but compatible version
51+
assertEquals(
52+
cursor,
53+
decodeFromString(encodeToString(cursor, TransportVersion.fromId(TransportVersion.CURRENT.id + 1), randomZone()))
54+
);
5155

52-
String encodedWithWrongVersion = encodeToString(cursor, nextMinorVersion, randomZone());
56+
TransportVersion otherVersion = TransportVersionUtils.randomVersionBetween(
57+
random(),
58+
TransportVersionUtils.getFirstVersion(),
59+
TransportVersion.V_8_7_0
60+
);
61+
62+
String encodedWithWrongVersion = encodeToString(cursor, otherVersion, randomZone());
5363
SqlIllegalArgumentException exception = expectThrows(
5464
SqlIllegalArgumentException.class,
5565
() -> decodeFromString(encodedWithWrongVersion)
5666
);
5767

5868
assertEquals(
59-
LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
69+
LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", otherVersion, TransportVersion.CURRENT),
6070
exception.getMessage()
6171
);
6272
}
@@ -113,7 +123,12 @@ public void testAttachingFormatterToEmptyCursor() {
113123
public void testAttachingFormatterToCursorFromOtherVersion() {
114124
Cursor cursor = randomSearchHitCursor();
115125
ZoneId zone = randomZone();
116-
String encoded = encodeToString(cursor, randomValueOtherThan(Version.CURRENT, () -> VersionUtils.randomVersion(random())), zone);
126+
TransportVersion version = TransportVersionUtils.randomVersionBetween(
127+
random(),
128+
TransportVersionUtils.getFirstVersion(),
129+
TransportVersion.V_8_7_0
130+
);
131+
String encoded = encodeToString(cursor, version, zone);
117132

118133
BasicFormatter formatter = randomFormatter();
119134
String withFormatter = attachFormatter(encoded, formatter);

0 commit comments

Comments
 (0)