Skip to content

Commit 7d1c434

Browse files
author
Lukas Wegmann
authored
SQL: Fix metrics reporting when sorting on aggregated values (elastic#81510)
Resolves elastic#81502 The bug has been caused by Querier using PlanExecutor.nextPage for internal calls. Because PlanExecutor.nextPage also reports the API usage metrics, the <clientType>.paging, <clientType>.total, _all.paging and _all.total counters have been increased incorrectly for queries performing a sort on aggregated values.
1 parent 9a3422e commit 7d1c434

File tree

3 files changed

+85
-29
lines changed

3 files changed

+85
-29
lines changed

x-pack/plugin/sql/qa/server/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlUsageTestCase.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_QUERY_REST_ENDPOINT;
3131
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_STATS_REST_ENDPOINT;
3232
import static org.elasticsearch.xpack.sql.proto.Protocol.SQL_TRANSLATE_REST_ENDPOINT;
33+
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
3334
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.query;
3435

3536
public abstract class RestSqlUsageTestCase extends ESRestTestCase {
@@ -59,11 +60,14 @@ public String toString() {
5960

6061
private Map<String, Integer> baseMetrics = new HashMap<String, Integer>();
6162
private Integer baseClientTypeTotalQueries = 0;
63+
private Integer baseClientTypePagingQueries = 0;
6264
private Integer baseClientTypeFailedQueries = 0;
6365
private Integer baseAllTotalQueries = 0;
66+
private Integer baseAllPagingQueries = 0;
6467
private Integer baseAllFailedQueries = 0;
6568
private Integer baseTranslateRequests = 0;
6669
private String clientType;
70+
private String mode;
6771
private boolean ignoreClientType;
6872

6973
/**
@@ -87,6 +91,16 @@ private void getBaseMetrics() throws UnsupportedOperationException, IOException
8791
clientType = ClientType.REST.toString();
8892
}
8993

94+
if (clientType.equals(ClientType.JDBC.toString())) {
95+
mode = Mode.JDBC.toString();
96+
} else if (clientType.startsWith(ClientType.ODBC.toString())) {
97+
mode = Mode.ODBC.toString();
98+
} else if (clientType.equals(ClientType.CLI.toString())) {
99+
mode = Mode.CLI.toString();
100+
} else {
101+
mode = Mode.PLAIN.toString();
102+
}
103+
90104
for (Map perNodeStats : nodesListStats) {
91105
Map featuresMetrics = (Map) ((Map) perNodeStats.get("stats")).get("features");
92106
Map queriesMetrics = (Map) ((Map) perNodeStats.get("stats")).get("queries");
@@ -96,8 +110,10 @@ private void getBaseMetrics() throws UnsupportedOperationException, IOException
96110

97111
// initialize the "base" metric values with whatever values are already recorded on ES
98112
baseClientTypeTotalQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("total");
113+
baseClientTypePagingQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("paging");
99114
baseClientTypeFailedQueries = ((Map<String, Integer>) queriesMetrics.get(clientType)).get("failed");
100115
baseAllTotalQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("total");
116+
baseAllPagingQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("paging");
101117
baseAllFailedQueries = ((Map<String, Integer>) queriesMetrics.get("_all")).get("failed");
102118
baseTranslateRequests = ((Map<String, Integer>) queriesMetrics.get("translate")).get("count");
103119
}
@@ -224,6 +240,47 @@ public void testSqlRestUsage() throws IOException {
224240
assertClientTypeAndAllQueryMetrics(clientTypeTotalQueries, allTotalQueries, responseAsMap);
225241
}
226242

243+
public void testScrollUsage() throws IOException {
244+
index(testData);
245+
246+
String cursor = runSql("SELECT page_count, name FROM library ORDER BY page_count", randomIntBetween(1, testData.size()));
247+
int scrollRequests = 0;
248+
249+
while (cursor != null) {
250+
cursor = scroll(cursor);
251+
scrollRequests++;
252+
}
253+
254+
Map<String, Object> responseAsMap = getStats();
255+
assertClientTypeQueryMetric(baseClientTypeTotalQueries + scrollRequests + 1, responseAsMap, "total");
256+
assertClientTypeQueryMetric(baseClientTypePagingQueries + scrollRequests, responseAsMap, "paging");
257+
258+
assertAllQueryMetric(baseAllTotalQueries + scrollRequests + 1, responseAsMap, "total");
259+
assertAllQueryMetric(baseAllPagingQueries + scrollRequests, responseAsMap, "paging");
260+
261+
assertFeatureMetric(baseMetrics.get("orderby") + 1, responseAsMap, "orderby");
262+
}
263+
264+
// test for bug https://github.com/elastic/elasticsearch/issues/81502
265+
public void testUsageOfQuerySortedByAggregationResult() throws IOException {
266+
index(testData);
267+
268+
String cursor = runSql("SELECT SUM(page_count), name FROM library GROUP BY 2 ORDER BY 1", 1);
269+
270+
Map<String, Object> responseAsMap = getStats();
271+
assertClientTypeQueryMetric(baseClientTypeTotalQueries + 1, responseAsMap, "total");
272+
assertClientTypeQueryMetric(baseClientTypePagingQueries, responseAsMap, "paging");
273+
assertAllQueryMetric(baseAllTotalQueries + 1, responseAsMap, "total");
274+
assertAllQueryMetric(baseAllPagingQueries, responseAsMap, "paging");
275+
276+
scroll(cursor);
277+
responseAsMap = getStats();
278+
assertClientTypeQueryMetric(baseClientTypeTotalQueries + 2, responseAsMap, "total");
279+
assertClientTypeQueryMetric(baseClientTypePagingQueries + 1, responseAsMap, "paging");
280+
assertAllQueryMetric(baseAllTotalQueries + 2, responseAsMap, "total");
281+
assertAllQueryMetric(baseAllPagingQueries + 1, responseAsMap, "paging");
282+
}
283+
227284
private void assertClientTypeAndAllQueryMetrics(int clientTypeTotalQueries, int allTotalQueries, Map<String, Object> responseAsMap)
228285
throws IOException {
229286
assertClientTypeQueryMetric(clientTypeTotalQueries, responseAsMap, "total");
@@ -277,17 +334,16 @@ private void runTranslate(String sql) throws IOException {
277334
client().performRequest(request);
278335
}
279336

280-
private void runSql(String sql) throws IOException {
281-
Mode mode = Mode.PLAIN;
282-
if (clientType.equals(ClientType.JDBC.toString())) {
283-
mode = Mode.JDBC;
284-
} else if (clientType.startsWith(ClientType.ODBC.toString())) {
285-
mode = Mode.ODBC;
286-
} else if (clientType.equals(ClientType.CLI.toString())) {
287-
mode = Mode.CLI;
288-
}
337+
private String runSql(String sql) throws IOException {
338+
return runSql(sql, null);
339+
}
289340

290-
runSql(mode.toString(), clientType, sql);
341+
private String scroll(String cursor) throws IOException {
342+
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
343+
request.setEntity(
344+
new StringEntity(RestSqlTestCase.cursor(cursor).mode(mode).clientId(clientType).toString(), ContentType.APPLICATION_JSON)
345+
);
346+
return (String) toMap(client().performRequest(request), mode).get("cursor");
291347
}
292348

293349
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -302,7 +358,7 @@ private void assertTranslateQueryMetric(int expected, Map<String, Object> respon
302358
assertEquals(expected, actualMetricValue);
303359
}
304360

305-
private void runSql(String mode, String restClient, String sql) throws IOException {
361+
private String runSql(String sql, Integer fetchSize) throws IOException {
306362
Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
307363
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
308364
request.addParameter("pretty", "true"); // Improves error reporting readability
@@ -318,11 +374,11 @@ private void runSql(String mode, String restClient, String sql) throws IOExcepti
318374
}
319375
request.setEntity(
320376
new StringEntity(
321-
query(sql).mode(mode).clientId(ignoreClientType ? StringUtils.EMPTY : restClient).toString(),
377+
query(sql).fetchSize(fetchSize).mode(mode).clientId(ignoreClientType ? StringUtils.EMPTY : clientType).toString(),
322378
ContentType.APPLICATION_JSON
323379
)
324380
);
325-
client().performRequest(request);
381+
return (String) toMap(client().performRequest(request), mode).get("cursor");
326382
}
327383

328384
@SuppressWarnings({ "unchecked", "rawtypes" })

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,19 @@ public void nextPage(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> l
111111
metrics.total(metric);
112112
metrics.paging(metric);
113113

114-
cursor.nextPage(cfg, client, writableRegistry, wrap(listener::onResponse, ex -> {
114+
nextPageInternal(cfg, cursor, wrap(listener::onResponse, ex -> {
115115
metrics.failed(metric);
116116
listener.onFailure(ex);
117117
}));
118118
}
119119

120+
/**
121+
* `nextPage` for internal callers (not from the APIs) without metrics reporting.
122+
*/
123+
public void nextPageInternal(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> listener) {
124+
cursor.nextPage(cfg, client, writableRegistry, listener);
125+
}
126+
120127
public void cleanCursor(Cursor cursor, ActionListener<Boolean> listener) {
121128
cursor.clear(client, listener);
122129
}

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
import org.elasticsearch.client.Client;
1717
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.common.util.CollectionUtils;
19-
import org.elasticsearch.core.Nullable;
2019
import org.elasticsearch.core.TimeValue;
2120
import org.elasticsearch.core.Tuple;
22-
import org.elasticsearch.index.query.QueryBuilder;
2321
import org.elasticsearch.search.aggregations.Aggregation;
2422
import org.elasticsearch.search.aggregations.Aggregations;
2523
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
@@ -91,24 +89,19 @@
9189
public class Querier {
9290
private static final Logger log = LogManager.getLogger(Querier.class);
9391

94-
private final PlanExecutor planExecutor;
9592
private final SqlConfiguration cfg;
96-
private final int size;
9793
private final Client client;
98-
@Nullable
99-
private final QueryBuilder filter;
100-
101-
public Querier(SqlSession sqlSession) {
102-
this.planExecutor = sqlSession.planExecutor();
103-
this.client = sqlSession.client();
104-
this.cfg = sqlSession.configuration();
105-
this.filter = cfg.filter();
106-
this.size = cfg.pageSize();
94+
private final PlanExecutor planExecutor;
95+
96+
public Querier(SqlSession session) {
97+
this.client = session.client();
98+
this.planExecutor = session.planExecutor();
99+
this.cfg = session.configuration();
107100
}
108101

109102
public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
110103
// prepare the request
111-
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
104+
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, cfg.filter(), cfg.pageSize());
112105

113106
if (this.cfg.runtimeMappings() != null) {
114107
sourceBuilder.runtimeMappings(this.cfg.runtimeMappings());
@@ -245,7 +238,7 @@ public void onResponse(Page page) {
245238
// 1a. trigger a next call if there's still data
246239
if (cursor != Cursor.EMPTY) {
247240
// trigger a next call
248-
planExecutor.nextPage(cfg, cursor, this);
241+
planExecutor.nextPageInternal(cfg, cursor, this);
249242
// make sure to bail out afterwards as we'll get called by a different thread
250243
return;
251244
}

0 commit comments

Comments
 (0)