Skip to content

Commit 5230638

Browse files
author
Lukas Wegmann
authored
SQL: Fix issues with format=txt when paging through result sets and in mixed node environments (#83833)
Resolves #83581 Resolves #83788 SQL REST requests using `format=txt` stand out from the other formats because the cursor needs to carry the formatter state from the initial request to subsequent scroll requests. The state is needed to be able to format the subsequent pages with column widths that match the widths in the first page. Currently, this is solved by wrapping cursor objects together with the formatter state in `TextFormatterCursor`. Hence, a query might return a `ListCursor` and `TextFormat.PLAIN_TEXT` adds the necessary state when formatting the output. This approach is handy because a `TextFormatterCursor` is a `Cursor` and delegates calls to the wrapped cursor. Unfortunately, it also has some downsides that have been revealed when looking into #83581 and #83788: - MediaType formatting is a concern of the REST layer of the plugin that does not have access to the serialization logic usually accessible through the `NamedWriteablesRegistry`. Because the formatting required to (de)serialize the cursors for the wrapping, this meant that the `Cursor` deserialization could only use a subset of `NamedWriteable`s specific to Cursors (namely the ones returned by `Cursors.getNamedWriteables`). This subset of writeables is not enough to deserialize `CompositeAggCursor` which lead to the surprising design that `CompositeAggCursor` has a `nextQuery` member consisting of the serialized `SearchSourceBuilder` whose deserialization is suspended until the call to `Cursor.nextPage`. - `TextFormat.PLAIN_TEXT` had to deserialize the complete cursor to do it's work, not just the information relevant to the formatter. Because of the `min_compatible_version` redirects that occur during rolling upgrades, it can happen though that an upgraded node needs to deserialize a cursor from a redirected SQL query written by an older node. Because we do not offer any form of bwc for cursors, this causes an error and leads to #83581. In this PR, I propose to change the design such that the REST layer can add state to the cursor strings without having to read or write instances of `Cursor`. This allows to address the issues described above and also made fixing #83788 very straightforward. Attaching state to the cursor is achieved by extending the format of encoded cursors. Previously, an base64 encoded SQL cursor had the following wire format `<version><zoneId><cursor>`. Now, there are two different cursor types: * `<version><zoneId><CursorType.NO_STATE><cursor>` for most cases * `<version><ZoneOffset.UTC><CursorType.WITH_STATE><formatterState><base64encodedCursor>` for txt format cursors. Note, `<ZoneOffset.UTC>` could be any zone id because consumers will use the one encoded in `<base64encodedCursor>`. This serialization format ensures that every version of ES can read and check the cursor version and produce an according error in case of a mismatch.
1 parent 6155e14 commit 5230638

File tree

26 files changed

+413
-330
lines changed

26 files changed

+413
-330
lines changed

docs/changelog/83833.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 83833
2+
summary: Fix issues with format=txt when paging through result sets and in mixed node environments
3+
area: SQL
4+
type: bug
5+
issues:
6+
- 83581
7+
- 83788

x-pack/plugin/sql/qa/mixed-node/src/test/java/org/elasticsearch/xpack/sql/qa/mixed_node/SqlCompatIT.java

+29
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,33 @@ private Map<String, Object> performRequestAndReadBodyAsJson(RestClient client, R
197197
}
198198
}
199199

200+
public void testCreateCursorWithFormatTxtOnNewNode() throws IOException {
201+
testCreateCursorWithFormatTxt(newNodesClient);
202+
}
203+
204+
public void testCreateCursorWithFormatTxtOnOldNode() throws IOException {
205+
testCreateCursorWithFormatTxt(oldNodesClient);
206+
}
207+
208+
/**
209+
* Tests covering https://github.com/elastic/elasticsearch/issues/83581
210+
*/
211+
public void testCreateCursorWithFormatTxt(RestClient client) throws IOException {
212+
index("{\"foo\":1}", "{\"foo\":2}");
213+
214+
Request query = new Request("POST", "_sql");
215+
XContentBuilder json = XContentFactory.jsonBuilder()
216+
.startObject()
217+
.field("query", randomFrom("SELECT foo FROM test", "SELECT foo FROM test GROUP BY foo"))
218+
.field("fetch_size", 1)
219+
.endObject();
220+
221+
query.setJsonEntity(Strings.toString(json));
222+
query.addParameter("format", "txt");
223+
224+
Response response = client.performRequest(query);
225+
assertOK(response);
226+
assertFalse(Strings.isNullOrEmpty(response.getHeader("Cursor")));
227+
}
228+
200229
}

x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java

+128-3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME;
7878
import static org.hamcrest.Matchers.containsString;
7979
import static org.hamcrest.Matchers.lessThan;
80+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8081

8182
/**
8283
* Integration test for the rest sql action. The one that speaks json directly to a
@@ -1175,11 +1176,14 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri
11751176
}
11761177
index(docs);
11771178

1178-
String request = query("SELECT text, number, number + 5 AS sum FROM " + indexPattern("test") + " ORDER BY number").fetchSize(2)
1179+
String optionalGroupBy = randomFrom("", " GROUP BY text, number, sum");
1180+
1181+
String request = query("SELECT text, number, number + 5 AS sum FROM " + indexPattern("test") + optionalGroupBy + " ORDER BY number")
1182+
.fetchSize(2)
11791183
.toString();
11801184

11811185
String cursor = null;
1182-
for (int i = 0; i <= 20; i += 2) {
1186+
for (int i = 0; i <= size; i += 2) {
11831187
Tuple<String, String> response;
11841188
if (i == 0) {
11851189
response = runSqlAsText(StringUtils.EMPTY, new StringEntity(request, ContentType.APPLICATION_JSON), format);
@@ -1194,7 +1198,7 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri
11941198
StringBuilder expected = new StringBuilder();
11951199
if (i == 0) {
11961200
expected.append(expectedHeader);
1197-
if (format == "text/plain") {
1201+
if (format.equals("text/plain")) {
11981202
expected.append("---------------+---------------+---------------\n");
11991203
}
12001204
}
@@ -1438,6 +1442,127 @@ public void testCompressCursor() throws IOException {
14381442
assertThat(resp.get("cursor").toString().length(), lessThan(5000));
14391443
}
14401444

1445+
public void testFetchAllPagesSearchHitCursorTxt() throws IOException {
1446+
testFetchAllPagesSearchHitCursor("text/plain");
1447+
}
1448+
1449+
public void testFetchAllPagesSearchHitCursorCsv() throws IOException {
1450+
testFetchAllPagesSearchHitCursor("text/csv");
1451+
}
1452+
1453+
public void testFetchAllPagesSearchHitCursorTsv() throws IOException {
1454+
testFetchAllPagesSearchHitCursor("text/tab-separated-values");
1455+
}
1456+
1457+
public void testFetchAllPagesSearchHitCursor(String format) throws IOException {
1458+
int size = randomIntBetween(4, 20);
1459+
int pageSize = randomIntBetween(1, size + 1);
1460+
1461+
List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
1462+
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));
1463+
1464+
testFetchAllPages(format, "SELECT field FROM " + indexPattern("test") + " ORDER BY field", texts, pageSize, true);
1465+
}
1466+
1467+
/**
1468+
* Special case of `testFetchAllPagesSearchHitCursorTxt` covering https://github.com/elastic/elasticsearch/issues/83788.
1469+
*/
1470+
public void testFetchAllPagesSearchHitCursorWithNonFullLastPageTxt() throws IOException {
1471+
int size = randomIntBetween(4, 20);
1472+
int pageSize = size / 2 + 1;
1473+
1474+
List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
1475+
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));
1476+
1477+
testFetchAllPages("text/plain", "SELECT field FROM " + indexPattern("test") + " ORDER BY field", texts, pageSize, false);
1478+
}
1479+
1480+
public void testFetchAllPagesCompositeAggCursorTxt() throws IOException {
1481+
testFetchAllPagesCompositeAggCursor("text/plain");
1482+
}
1483+
1484+
public void testFetchAllPagesCompositeAggCursorCsv() throws IOException {
1485+
testFetchAllPagesCompositeAggCursor("text/csv");
1486+
}
1487+
1488+
public void testFetchAllPagesCompositeAggCursorTsv() throws IOException {
1489+
testFetchAllPagesCompositeAggCursor("text/tab-separated-values");
1490+
}
1491+
1492+
public void testFetchAllPagesCompositeAggCursor(String format) throws IOException {
1493+
int size = randomIntBetween(4, 20);
1494+
int pageSize = randomIntBetween(1, size + 1);
1495+
1496+
List<String> texts = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
1497+
index(texts.stream().map(t -> "{\"field\": \"" + t + "\"}").toArray(String[]::new));
1498+
1499+
testFetchAllPages(format, "SELECT field FROM " + indexPattern("test") + " GROUP BY field ORDER BY field", texts, pageSize, true);
1500+
}
1501+
1502+
public void testFetchAllPagesListCursorTxt() throws IOException {
1503+
testFetchAllPagesListCursor("text/plain");
1504+
}
1505+
1506+
public void testFetchAllPagesListCursorCsv() throws IOException {
1507+
testFetchAllPagesListCursor("text/csv");
1508+
}
1509+
1510+
public void testFetchAllPagesListCursorTsv() throws IOException {
1511+
testFetchAllPagesListCursor("text/tab-separated-values");
1512+
}
1513+
1514+
public void testFetchAllPagesListCursor(String format) throws IOException {
1515+
int size = randomIntBetween(4, 20);
1516+
int pageSize = randomIntBetween(1, size + 1);
1517+
1518+
List<String> fields = IntStream.range(0, size).mapToObj(i -> String.format(Locale.ROOT, "text%02d", i)).toList();
1519+
index(fields.stream().map(f -> "{\"" + f + "\": 1}").toArray(String[]::new));
1520+
1521+
testFetchAllPages(format, "SHOW COLUMNS FROM " + indexPattern("test"), fields, pageSize, false);
1522+
}
1523+
1524+
/**
1525+
* Generic paging test that ensures that
1526+
* 1. All expected documents are fetched when scrolling through all pages
1527+
* 2. There are at most `expectedValues.size() / pageSize + 1` pages (the last one might or might not be empty)
1528+
* 3. Optionally: That the last page is not empty.
1529+
*/
1530+
private void testFetchAllPages(String format, String query, List<String> expectedValues, int pageSize, boolean allowEmptyLastPage)
1531+
throws IOException {
1532+
int remainingPages = expectedValues.size() / pageSize + 1;
1533+
1534+
Tuple<String, String> response = runSqlAsText(
1535+
StringUtils.EMPTY,
1536+
new StringEntity(query(query).fetchSize(pageSize).toString(), ContentType.APPLICATION_JSON),
1537+
format
1538+
);
1539+
StringBuilder allResults = new StringBuilder(response.v1());
1540+
1541+
while (response.v2() != null && remainingPages > 0) {
1542+
response = runSqlAsText(
1543+
StringUtils.EMPTY,
1544+
new StringEntity(cursor(response.v2()).toString(), ContentType.APPLICATION_JSON),
1545+
format
1546+
);
1547+
allResults.append("\n").append(response.v1());
1548+
1549+
assertThat(response.v1().split("\n").length, lessThanOrEqualTo(pageSize));
1550+
1551+
remainingPages--;
1552+
}
1553+
1554+
if (allowEmptyLastPage == false) {
1555+
assertFalse(Strings.isNullOrEmpty(response.v1()));
1556+
}
1557+
1558+
assertNull(response.v2());
1559+
1560+
String results = allResults.toString();
1561+
for (String v : expectedValues) {
1562+
assertThat(results, containsString(v));
1563+
}
1564+
}
1565+
14411566
static Map<String, Object> runSql(RequestObjectBuilder builder, String mode) throws IOException {
14421567
return toMap(runSql(builder.mode(mode)), mode);
14431568
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,11 @@ public void nextPage(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> l
120120
* `nextPage` for internal callers (not from the APIs) without metrics reporting.
121121
*/
122122
public void nextPageInternal(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> listener) {
123-
cursor.nextPage(cfg, client, writableRegistry, listener);
123+
cursor.nextPage(cfg, client, listener);
124124
}
125125

126126
public void cleanCursor(Cursor cursor, ActionListener<Boolean> listener) {
127-
cursor.clear(client, writableRegistry, listener);
127+
cursor.clear(client, listener);
128128
}
129129

130130
public Client client() {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggCursor.java

+22-29
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.client.internal.Client;
15-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1615
import org.elasticsearch.common.io.stream.StreamInput;
1716
import org.elasticsearch.common.io.stream.StreamOutput;
1817
import org.elasticsearch.search.aggregations.Aggregation;
@@ -38,10 +37,8 @@
3837
import java.util.function.BiFunction;
3938
import java.util.function.Supplier;
4039

41-
import static org.elasticsearch.xpack.sql.execution.search.Querier.deserializeQuery;
4240
import static org.elasticsearch.xpack.sql.execution.search.Querier.logSearchResponse;
4341
import static org.elasticsearch.xpack.sql.execution.search.Querier.prepareRequest;
44-
import static org.elasticsearch.xpack.sql.execution.search.Querier.serializeQuery;
4542

4643
/**
4744
* Cursor for composite aggregation (GROUP BY).
@@ -54,15 +51,22 @@ public class CompositeAggCursor implements Cursor {
5451
public static final String NAME = "c";
5552

5653
private final String[] indices;
57-
private final byte[] nextQuery;
54+
private final SearchSourceBuilder nextQuery;
5855
private final List<BucketExtractor> extractors;
5956
private final BitSet mask;
6057
private final int limit;
6158
private final boolean includeFrozen;
6259

63-
CompositeAggCursor(byte[] next, List<BucketExtractor> exts, BitSet mask, int remainingLimit, boolean includeFrozen, String... indices) {
60+
CompositeAggCursor(
61+
SearchSourceBuilder nextQuery,
62+
List<BucketExtractor> exts,
63+
BitSet mask,
64+
int remainingLimit,
65+
boolean includeFrozen,
66+
String... indices
67+
) {
6468
this.indices = indices;
65-
this.nextQuery = next;
69+
this.nextQuery = nextQuery;
6670
this.extractors = exts;
6771
this.mask = mask;
6872
this.limit = remainingLimit;
@@ -71,7 +75,7 @@ public class CompositeAggCursor implements Cursor {
7175

7276
public CompositeAggCursor(StreamInput in) throws IOException {
7377
indices = in.readStringArray();
74-
nextQuery = in.readByteArray();
78+
nextQuery = new SearchSourceBuilder(in);
7579
limit = in.readVInt();
7680

7781
extractors = in.readNamedWriteableList(BucketExtractor.class);
@@ -82,7 +86,7 @@ public CompositeAggCursor(StreamInput in) throws IOException {
8286
@Override
8387
public void writeTo(StreamOutput out) throws IOException {
8488
out.writeStringArray(indices);
85-
out.writeByteArray(nextQuery);
89+
nextQuery.writeTo(out);
8690
out.writeVInt(limit);
8791

8892
out.writeNamedWriteableList(extractors);
@@ -99,7 +103,7 @@ String[] indices() {
99103
return indices;
100104
}
101105

102-
byte[] next() {
106+
SearchSourceBuilder next() {
103107
return nextQuery;
104108
}
105109

@@ -120,21 +124,12 @@ boolean includeFrozen() {
120124
}
121125

122126
@Override
123-
public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry registry, ActionListener<Page> listener) {
124-
SearchSourceBuilder q;
125-
try {
126-
q = deserializeQuery(registry, nextQuery);
127-
} catch (Exception ex) {
128-
listener.onFailure(ex);
129-
return;
130-
}
131-
132-
SearchSourceBuilder query = q;
127+
public void nextPage(SqlConfiguration cfg, Client client, ActionListener<Page> listener) {
133128
if (log.isTraceEnabled()) {
134-
log.trace("About to execute composite query {} on {}", StringUtils.toString(query), indices);
129+
log.trace("About to execute composite query {} on {}", StringUtils.toString(nextQuery), indices);
135130
}
136131

137-
SearchRequest request = prepareRequest(query, cfg.requestTimeout(), includeFrozen, indices);
132+
SearchRequest request = prepareRequest(nextQuery, cfg.requestTimeout(), includeFrozen, indices);
138133

139134
client.search(request, new ActionListener.Delegating<>(listener) {
140135
@Override
@@ -156,15 +151,15 @@ protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
156151
return () -> new CompositeAggRowSet(extractors, mask, response, limit);
157152
}
158153

159-
protected BiFunction<byte[], CompositeAggRowSet, CompositeAggCursor> makeCursor() {
154+
protected BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor() {
160155
return (q, r) -> new CompositeAggCursor(q, r.extractors(), r.mask(), r.remainingData(), includeFrozen, indices);
161156
}
162157

163158
static void handle(
164159
SearchResponse response,
165160
SearchSourceBuilder source,
166161
Supplier<CompositeAggRowSet> makeRowSet,
167-
BiFunction<byte[], CompositeAggRowSet, CompositeAggCursor> makeCursor,
162+
BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor,
168163
Runnable retry,
169164
ActionListener<Page> listener,
170165
Schema schema
@@ -186,13 +181,11 @@ static void handle(
186181
CompositeAggRowSet rowSet = makeRowSet.get();
187182
Map<String, Object> afterKey = rowSet.afterKey();
188183

189-
byte[] queryAsBytes = null;
190184
if (afterKey != null) {
191185
updateSourceAfterKey(afterKey, source);
192-
queryAsBytes = serializeQuery(source);
193186
}
194187

195-
Cursor next = rowSet.remainingData() == 0 ? Cursor.EMPTY : makeCursor.apply(queryAsBytes, rowSet);
188+
Cursor next = rowSet.remainingData() == 0 ? Cursor.EMPTY : makeCursor.apply(source, rowSet);
196189
listener.onResponse(new Page(rowSet, next));
197190
} catch (Exception ex) {
198191
listener.onFailure(ex);
@@ -247,13 +240,13 @@ private static void updateSourceAfterKey(Map<String, Object> afterKey, SearchSou
247240
}
248241

249242
@Override
250-
public void clear(Client client, NamedWriteableRegistry registry, ActionListener<Boolean> listener) {
243+
public void clear(Client client, ActionListener<Boolean> listener) {
251244
listener.onResponse(true);
252245
}
253246

254247
@Override
255248
public int hashCode() {
256-
return Objects.hash(Arrays.hashCode(indices), Arrays.hashCode(nextQuery), extractors, limit, mask, includeFrozen);
249+
return Objects.hash(Arrays.hashCode(indices), nextQuery, extractors, limit, mask, includeFrozen);
257250
}
258251

259252
@Override
@@ -263,7 +256,7 @@ public boolean equals(Object obj) {
263256
}
264257
CompositeAggCursor other = (CompositeAggCursor) obj;
265258
return Arrays.equals(indices, other.indices)
266-
&& Arrays.equals(nextQuery, other.nextQuery)
259+
&& Objects.equals(nextQuery, other.nextQuery)
267260
&& Objects.equals(extractors, other.extractors)
268261
&& Objects.equals(limit, other.limit)
269262
&& Objects.equals(includeFrozen, other.includeFrozen);

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/PivotCursor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.search.SearchResponse;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.search.builder.SearchSourceBuilder;
1314
import org.elasticsearch.xpack.ql.execution.search.extractor.BucketExtractor;
1415
import org.elasticsearch.xpack.ql.type.Schema;
1516

@@ -29,7 +30,7 @@ public class PivotCursor extends CompositeAggCursor {
2930

3031
PivotCursor(
3132
Map<String, Object> previousKey,
32-
byte[] next,
33+
SearchSourceBuilder next,
3334
List<BucketExtractor> exts,
3435
BitSet mask,
3536
int remainingLimit,
@@ -67,7 +68,7 @@ protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
6768
}
6869

6970
@Override
70-
protected BiFunction<byte[], CompositeAggRowSet, CompositeAggCursor> makeCursor() {
71+
protected BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor() {
7172
return (q, r) -> {
7273
Map<String, Object> lastAfterKey = r instanceof PivotRowSet ? ((PivotRowSet) r).lastAfterKey() : null;
7374
return new PivotCursor(lastAfterKey, q, r.extractors(), r.mask(), r.remainingData(), includeFrozen(), indices());

0 commit comments

Comments
 (0)