Skip to content

Commit 8f7afbd

Browse files
authored
SQL: Fix issue with timezone when paginating (#52101)
Previously, when the specified (or default) fetchSize led to subsequent HTTP requests and the usage of cursors, those subsequent were no longer using the client timezone specified in the initial SQL query. As a consequence, Even though the query is executed once (with the correct timezone) the processing of the query results by the HitExtractors in the next pages was done using the default timezone Z. This could lead to incorrect results. Fix the issue by correctly using the initially specified timezone, which is found in the deserialisation of the cursor string. Fixes: #51258
1 parent 1a099fa commit 8f7afbd

File tree

8 files changed

+152
-26
lines changed

8 files changed

+152
-26
lines changed

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java

+51-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
import java.sql.ResultSet;
1717
import java.sql.SQLException;
1818
import java.sql.Statement;
19+
import java.time.Instant;
20+
import java.time.ZoneId;
21+
import java.time.ZonedDateTime;
22+
import java.util.Properties;
1923

24+
import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
2025
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;
2126

2227
/**
@@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception {
102107
assertNoSearchContexts();
103108
}
104109

110+
public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException {
111+
Request request = new Request("PUT", "/test_date_timezone");
112+
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
113+
createIndex.startObject("mappings");
114+
{
115+
createIndex.startObject("properties");
116+
{
117+
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
118+
createIndex.endObject();
119+
}
120+
createIndex.endObject();
121+
}
122+
createIndex.endObject().endObject();
123+
request.setJsonEntity(Strings.toString(createIndex));
124+
client().performRequest(request);
125+
126+
request = new Request("PUT", "/test_date_timezone/_bulk");
127+
request.addParameter("refresh", "true");
128+
StringBuilder bulk = new StringBuilder();
129+
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
130+
for (long datetime : datetimes) {
131+
bulk.append("{\"index\":{}}\n");
132+
bulk.append("{\"date\":").append(datetime).append("}\n");
133+
}
134+
request.setJsonEntity(bulk.toString());
135+
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
136+
137+
ZoneId zoneId = randomZone();
138+
Properties connectionProperties = connectionProperties();
139+
connectionProperties.put(JDBC_TIMEZONE, zoneId.toString());
140+
try (Connection c = esJdbc(connectionProperties);
141+
Statement s = c.createStatement()) {
142+
s.setFetchSize(2);
143+
try (ResultSet rs =
144+
s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) {
145+
for (int i = 0; i < datetimes.length; i++) {
146+
assertEquals(2, rs.getFetchSize());
147+
assertTrue("No more entries left at " + i, rs.next());
148+
assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset()
149+
.getTotalSeconds()/ 60, rs.getInt(1));
150+
}
151+
assertFalse(rs.next());
152+
}
153+
}
154+
}
105155

106156
/**
107157
* Test for {@code SELECT} that is implemented as an aggregation.
@@ -237,4 +287,4 @@ private void addPivotData() throws Exception {
237287
request.setJsonEntity(bulk.toString());
238288
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
239289
}
240-
}
290+
}

x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

+69-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.elasticsearch.xpack.sql.qa.rest;
77

88
import com.fasterxml.jackson.core.io.JsonStringEncoder;
9-
109
import org.apache.http.HttpEntity;
1110
import org.apache.http.entity.ContentType;
1211
import org.apache.http.entity.StringEntity;
@@ -15,9 +14,11 @@
1514
import org.elasticsearch.client.Response;
1615
import org.elasticsearch.client.ResponseException;
1716
import org.elasticsearch.common.CheckedSupplier;
17+
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.common.bytes.BytesArray;
1919
import org.elasticsearch.common.collect.Tuple;
2020
import org.elasticsearch.common.io.Streams;
21+
import org.elasticsearch.common.xcontent.XContentBuilder;
2122
import org.elasticsearch.common.xcontent.XContentHelper;
2223
import org.elasticsearch.common.xcontent.json.JsonXContent;
2324
import org.elasticsearch.test.NotEqualMessageBuilder;
@@ -31,6 +32,9 @@
3132
import java.io.InputStreamReader;
3233
import java.nio.charset.StandardCharsets;
3334
import java.sql.JDBCType;
35+
import java.time.Instant;
36+
import java.time.ZoneId;
37+
import java.time.ZonedDateTime;
3438
import java.util.ArrayList;
3539
import java.util.Arrays;
3640
import java.util.Collections;
@@ -151,6 +155,70 @@ public void testNextPage() throws IOException {
151155
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
152156
}
153157

158+
public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
159+
Request request = new Request("PUT", "/test_date_timezone");
160+
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
161+
createIndex.startObject("mappings");
162+
{
163+
createIndex.startObject("properties");
164+
{
165+
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
166+
createIndex.endObject();
167+
}
168+
createIndex.endObject();
169+
}
170+
createIndex.endObject().endObject();
171+
request.setJsonEntity(Strings.toString(createIndex));
172+
client().performRequest(request);
173+
174+
request = new Request("PUT", "/test_date_timezone/_bulk");
175+
request.addParameter("refresh", "true");
176+
StringBuilder bulk = new StringBuilder();
177+
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
178+
for (long datetime : datetimes) {
179+
bulk.append("{\"index\":{}}\n");
180+
bulk.append("{\"date\":").append(datetime).append("}\n");
181+
}
182+
request.setJsonEntity(bulk.toString());
183+
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
184+
185+
ZoneId zoneId = randomZone();
186+
String mode = randomMode();
187+
String sqlRequest =
188+
"{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\","
189+
+ "\"time_zone\":\"" + zoneId.getId() + "\", "
190+
+ "\"mode\":\"" + mode + "\", "
191+
+ "\"fetch_size\":2}";
192+
193+
String cursor = null;
194+
for (int i = 0; i <= datetimes.length; i += 2) {
195+
Map<String, Object> expected = new HashMap<>();
196+
Map<String, Object> response;
197+
198+
if (i == 0) {
199+
expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
200+
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
201+
} else {
202+
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
203+
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
204+
}
205+
206+
List<Object> values = new ArrayList<>(2);
207+
for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) {
208+
values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId)
209+
.getOffset().getTotalSeconds() / 60));
210+
}
211+
expected.put("rows", values);
212+
cursor = (String) response.remove("cursor");
213+
assertResponse(expected, response);
214+
assertNotNull(cursor);
215+
}
216+
Map<String, Object> expected = new HashMap<>();
217+
expected.put("rows", emptyList());
218+
assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
219+
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
220+
}
221+
154222
@AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
155223
public void testTimeZone() throws IOException {
156224
String mode = randomMode();

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene
4343

4444
public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request,
4545
ActionListener<SqlClearCursorResponse> listener) {
46-
Cursor cursor = Cursors.decodeFromString(request.getCursor());
46+
Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
4747
planExecutor.cleanCursor(
4848
new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
4949
request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY,

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.support.HandledTransportAction;
1111
import org.elasticsearch.cluster.service.ClusterService;
1212
import org.elasticsearch.common.Strings;
13+
import org.elasticsearch.common.collect.Tuple;
1314
import org.elasticsearch.common.inject.Inject;
1415
import org.elasticsearch.common.settings.Settings;
1516
import org.elasticsearch.tasks.Task;
@@ -26,12 +27,14 @@
2627
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
2728
import org.elasticsearch.xpack.sql.proto.Mode;
2829
import org.elasticsearch.xpack.sql.session.Configuration;
30+
import org.elasticsearch.xpack.sql.session.Cursor;
2931
import org.elasticsearch.xpack.sql.session.Cursor.Page;
3032
import org.elasticsearch.xpack.sql.session.Cursors;
3133
import org.elasticsearch.xpack.sql.session.RowSet;
3234
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
3335
import org.elasticsearch.xpack.sql.type.SqlDataTypes;
3436

37+
import java.time.ZoneId;
3538
import java.util.ArrayList;
3639
import java.util.List;
3740

@@ -68,7 +71,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener<SqlQ
6871
/**
6972
* Actual implementation of the action. Statically available to support embedded mode.
7073
*/
71-
public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
74+
static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
7275
String username, String clusterName) {
7376
// The configuration is always created however when dealing with the next page, only the timeouts are relevant
7477
// the rest having default values (since the query is already created)
@@ -80,13 +83,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request,
8083
planExecutor.sql(cfg, request.query(), request.params(),
8184
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
8285
} else {
83-
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
84-
wrap(p -> listener.onResponse(createResponse(request, null, p)),
86+
Tuple<Cursor, ZoneId> decoded = Cursors.decodeFromStringWithZone(request.cursor());
87+
planExecutor.nextPage(cfg, decoded.v1(),
88+
wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)),
8589
listener::onFailure));
8690
}
8791
}
8892

89-
static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
93+
private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
9094
RowSet rset = page.rowSet();
9195
if ((rset instanceof SchemaRowSet) == false) {
9296
throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
@@ -102,10 +106,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p
102106
}
103107
}
104108
columns = unmodifiableList(columns);
105-
return createResponse(request, columns, page);
109+
return createResponse(request, request.zoneId(), columns, page);
106110
}
107111

108-
static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
112+
private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List<ColumnInfo> header, Page page) {
109113
List<List<Object>> rows = new ArrayList<>();
110114
page.rowSet().forEachRow(rowView -> {
111115
List<Object> row = new ArrayList<>(rowView.columnCount());
@@ -114,7 +118,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo>
114118
});
115119

116120
return new SqlQueryResponse(
117-
Cursors.encodeToString(page.next(), request.zoneId()),
121+
Cursors.encodeToString(page.next(), zoneId),
118122
request.mode(),
119123
request.columnar(),
120124
header,

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

-8
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
8383
}
8484
}
8585

86-
87-
/**
88-
* Read a {@linkplain Cursor} from a string.
89-
*/
90-
public static Cursor decodeFromString(String base64) {
91-
return decodeFromStringWithZone(base64).v1();
92-
}
93-
9486
/**
9587
* Read a {@linkplain Cursor} from a string.
9688
*/

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
1313
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
1414
import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
15+
import org.elasticsearch.xpack.sql.plugin.CursorTests;
1516
import org.elasticsearch.xpack.sql.session.Cursors;
1617

1718
import java.io.IOException;
@@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro
6869
if (randomBoolean()) {
6970
return super.copyInstance(instance, version);
7071
}
71-
return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
72+
return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
7273
}
7374
}

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,19 @@ static Cursor randomNonEmptyCursor() {
104104

105105
public void testVersionHandling() {
106106
Cursor cursor = randomNonEmptyCursor();
107-
assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
107+
assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone())));
108108

109109
Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
110110

111111
String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
112112
SqlIllegalArgumentException exception = expectThrows(SqlIllegalArgumentException.class,
113-
() -> Cursors.decodeFromString(encodedWithWrongVersion));
113+
() -> decodeFromString(encodedWithWrongVersion));
114114

115115
assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
116116
exception.getMessage());
117117
}
118-
}
118+
119+
public static Cursor decodeFromString(String base64) {
120+
return Cursors.decodeFromStringWithZone(base64).v1();
121+
}
122+
}

x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77

88
import org.elasticsearch.Version;
99
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
10-
import org.elasticsearch.test.AbstractWireTestCase;
10+
import org.elasticsearch.common.io.stream.Writeable;
11+
import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
12+
import org.elasticsearch.xpack.sql.plugin.CursorTests;
1113

1214
import java.io.IOException;
1315
import java.util.ArrayList;
1416
import java.util.Arrays;
1517
import java.util.List;
1618

17-
public class ListCursorTests extends AbstractWireTestCase<ListCursor> {
19+
public class ListCursorTests extends AbstractSqlWireSerializingTestCase<ListCursor> {
1820
public static ListCursor randomPagingListCursor() {
1921
int size = between(1, 20);
2022
int depth = between(1, 20);
@@ -44,13 +46,18 @@ protected ListCursor createTestInstance() {
4446
return randomPagingListCursor();
4547
}
4648

49+
@Override
50+
protected Writeable.Reader<ListCursor> instanceReader() {
51+
return ListCursor::new;
52+
}
53+
4754
@Override
4855
protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException {
4956
/* Randomly choose between internal protocol round trip and String based
5057
* round trips used to toXContent. */
5158
if (randomBoolean()) {
5259
return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version);
5360
}
54-
return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
61+
return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone()));
5562
}
56-
}
63+
}

0 commit comments

Comments
 (0)