-
Notifications
You must be signed in to change notification settings - Fork 25.2k
SQL: Fix bug caused by empty composites #30343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,12 +113,36 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re | |
|
||
SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices); | ||
|
||
client.search(search, ActionListener.wrap(r -> { | ||
updateCompositeAfterKey(r, query); | ||
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, | ||
serializeQuery(query), indices); | ||
listener.onResponse(rowSet); | ||
}, listener::onFailure)); | ||
client.search(search, new ActionListener<SearchResponse>() { | ||
@Override | ||
public void onResponse(SearchResponse r) { | ||
try { | ||
// retry | ||
if (shouldRetryDueToEmptyPage(r)) { | ||
CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); | ||
client.search(search, this); | ||
return; | ||
} | ||
|
||
updateCompositeAfterKey(r, query); | ||
CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices); | ||
listener.onResponse(rowSet); | ||
} catch (Exception ex) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You shouldn't need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While updating the code, I was reminded why I added it in the first place - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand with |
||
listener.onFailure(ex); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception ex) { | ||
listener.onFailure(ex); | ||
} | ||
}); | ||
} | ||
|
||
static boolean shouldRetryDueToEmptyPage(SearchResponse response) { | ||
CompositeAggregation composite = getComposite(response); | ||
// if there are no buckets but a next page, go fetch it instead of sending an empty response to the client | ||
return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty(); | ||
} | ||
|
||
static CompositeAggregation getComposite(SearchResponse response) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -206,8 +206,15 @@ static class CompositeActionListener extends BaseAggActionListener { | |
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) { | ||
// there are some results | ||
if (response.getAggregations().asList().size() > 0) { | ||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); | ||
|
||
// retry | ||
if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { | ||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); | ||
client.search(request, this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same deal here. Make sure it isn't going to stack overflow. It probably won't be we should make sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've ran the test suite with the troublesome seed and without (plus ran the test separately with the page set to 1 - this is what triggered the bug). Anything else I should watch out for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You pretty much have to read the code to make sure it won't happen. With search it only came up because dozens of shards for the search were on the local node. With this it'd come up if the composite agg needed to be retried hundreds of times and the API doesn't otherwise go async. I'd dig through the client implementation. I think this is safe because you are being called on the listener thread pool and the search is running on the search threadpool but it is worth double checking. |
||
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the |
||
} | ||
|
||
CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); | ||
byte[] nextSearch = null; | ||
try { | ||
nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nik9000 is this okay for chaining listeners, in particular the
return
for the current call?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general chaining listeners is fine. I don't tend to use the
return
early style in listeners because I feel like they should look symmetric. They just feel better to me when they do. I use the earlyreturn
style everywhere else though.The only concern with chaining listeners like this is that some APIs are synchronous sometimes and if you chain on a synchronous API you'll get a stack overflow. The initial phase of search has this problem. I don't know if you have that problem here but I'd dig through the code to make super sure you don't.