Skip to content

Commit a57512c

Browse files
committed
SQL: Fix bug caused by empty composites (#30343)
When dealing with filtering, a composite aggregation might return empty buckets (which have been filtered) which gets sent as is to the client. Unfortunately this interprets the response as no more data instead of retrying. This now has changed and the listener keeps retrying until either the query has ended or data passes the filter. Fix #30292
1 parent cc254e3 commit a57512c

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java

+30-6
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,36 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re
113113

114114
SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices);
115115

116-
client.search(search, ActionListener.wrap(r -> {
117-
updateCompositeAfterKey(r, query);
118-
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit,
119-
serializeQuery(query), indices);
120-
listener.onResponse(rowSet);
121-
}, listener::onFailure));
116+
client.search(search, new ActionListener<SearchResponse>() {
117+
@Override
118+
public void onResponse(SearchResponse r) {
119+
try {
120+
// retry
121+
if (shouldRetryDueToEmptyPage(r)) {
122+
CompositeAggregationCursor.updateCompositeAfterKey(r, search.source());
123+
client.search(search, this);
124+
return;
125+
}
126+
127+
updateCompositeAfterKey(r, query);
128+
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices);
129+
listener.onResponse(rowSet);
130+
} catch (Exception ex) {
131+
listener.onFailure(ex);
132+
}
133+
}
134+
135+
@Override
136+
public void onFailure(Exception ex) {
137+
listener.onFailure(ex);
138+
}
139+
});
140+
}
141+
142+
static boolean shouldRetryDueToEmptyPage(SearchResponse response) {
143+
CompositeAggregation composite = getComposite(response);
144+
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client
145+
return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty();
122146
}
123147

124148
static CompositeAggregation getComposite(SearchResponse response) {

x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,15 @@ static class CompositeActionListener extends BaseAggActionListener {
206206
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
207207
// there are some results
208208
if (response.getAggregations().asList().size() > 0) {
209-
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
210209

210+
// retry
211+
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) {
212+
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
213+
client.search(request, this);
214+
return;
215+
}
216+
217+
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source());
211218
byte[] nextSearch = null;
212219
try {
213220
nextSearch = CompositeAggregationCursor.serializeQuery(request.source());

0 commit comments

Comments
 (0)