Skip to content

Commit a079b9f

Browse files
committed
SQL: ConstantProcessor can now handle NamedWriteable (#39876)
Enhance ConstantProcessor to properly serialize complex objects (Intervals) that have their own custom serialization/deserialization mechanism Fix #39875 (cherry picked from commit ed8a1f9)
1 parent 471aa6a commit a079b9f

File tree

4 files changed

+51
-5
lines changed

4 files changed

+51
-5
lines changed

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected ResultSet executeJdbcQuery(Connection con, String query) throws SQLExc
107107
}
108108

109109
protected int fetchSize() {
110-
return between(1, 500);
110+
return between(1, 150);
111111
}
112112

113113
// TODO: use UTC for now until deciding on a strategy for handling date extraction

x-pack/plugin/sql/qa/src/main/resources/datetime.csv-spec

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,22 @@ SELECT YEAR(NOW() - INTERVAL 2 YEARS) / 1000 AS result;
346346
2
347347
;
348348

349+
dateAndIntervalPaginated
350+
SELECT YEAR(birth_date - INTERVAL 2 YEARS) / 1000 AS result FROM test_emp ORDER BY birth_date LIMIT 10;
351+
352+
result
353+
---------------
354+
1
355+
1
356+
1
357+
1
358+
1
359+
1
360+
1
361+
1
362+
1
363+
1
364+
;
349365

350366
currentTimestampFilter
351367
SELECT first_name FROM test_emp WHERE hire_date > NOW() - INTERVAL 100 YEARS ORDER BY first_name ASC LIMIT 10;

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/gen/processor/ConstantProcessor.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.sql.expression.gen.processor;
77

8+
import org.elasticsearch.common.io.stream.NamedWriteable;
89
import org.elasticsearch.common.io.stream.StreamInput;
910
import org.elasticsearch.common.io.stream.StreamOutput;
1011

@@ -16,17 +17,40 @@ public class ConstantProcessor implements Processor {
1617
public static String NAME = "c";
1718

1819
private final Object constant;
20+
private final boolean namedWriteable;
21+
private final Class<?> clazz;
1922

2023
public ConstantProcessor(Object value) {
2124
this.constant = value;
25+
this.namedWriteable = value instanceof NamedWriteable;
26+
this.clazz = namedWriteable ? value.getClass() : null;
2227
}
2328

29+
@SuppressWarnings("unchecked")
2430
public ConstantProcessor(StreamInput in) throws IOException {
25-
constant = in.readGenericValue();
31+
namedWriteable = in.readBoolean();
32+
if (namedWriteable) {
33+
try {
34+
clazz = ConstantProcessor.class.getClassLoader().loadClass(in.readString());
35+
} catch (ClassNotFoundException e) {
36+
throw new IOException(e);
37+
}
38+
constant = in.readNamedWriteable((Class<NamedWriteable>) clazz);
39+
} else {
40+
clazz = null;
41+
constant = in.readGenericValue();
42+
}
2643
}
2744

45+
@Override
2846
public void writeTo(StreamOutput out) throws IOException {
29-
out.writeGenericValue(constant);
47+
out.writeBoolean(namedWriteable);
48+
if (namedWriteable) {
49+
out.writeString(constant.getClass().getName());
50+
out.writeNamedWriteable((NamedWriteable) constant);
51+
} else {
52+
out.writeGenericValue(constant);
53+
}
3054
}
3155

3256
@Override

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/gen/processor/ConstantProcessorTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
import org.elasticsearch.common.io.stream.Writeable.Reader;
99
import org.elasticsearch.test.AbstractWireSerializingTestCase;
10-
import org.elasticsearch.xpack.sql.expression.gen.processor.ConstantProcessor;
10+
import org.elasticsearch.xpack.sql.expression.literal.IntervalDayTime;
11+
import org.elasticsearch.xpack.sql.type.DataType;
1112

1213
import java.io.IOException;
14+
import java.time.Duration;
15+
import java.util.concurrent.TimeUnit;
1316

1417
public class ConstantProcessorTests extends AbstractWireSerializingTestCase<ConstantProcessor> {
1518
public static ConstantProcessor randomConstantProcessor() {
@@ -28,7 +31,10 @@ protected Reader<ConstantProcessor> instanceReader() {
2831

2932
@Override
3033
protected ConstantProcessor mutateInstance(ConstantProcessor instance) throws IOException {
31-
return new ConstantProcessor(randomValueOtherThan(instance.process(null), () -> randomAlphaOfLength(5)));
34+
return new ConstantProcessor(randomValueOtherThan(instance.process(null),
35+
() -> new IntervalDayTime(Duration.ofSeconds(
36+
randomLongBetween(TimeUnit.SECONDS.convert(3, TimeUnit.HOURS), TimeUnit.SECONDS.convert(23, TimeUnit.HOURS))),
37+
DataType.INTERVAL_DAY_TO_SECOND)));
3238
}
3339

3440
public void testApply() {

0 commit comments

Comments
 (0)