Skip to content

Commit 8f8d3a5

Browse files
authored
[Rollup] Return empty response when aggs are missing (elastic#32796)
If a search request doesn't contain aggs (or an empty agg object), we should just retun an empty response. This is how the normal search API works if you specify zero hits and empty aggs. The existing behavior throws an exception because it tries to send an empty msearch. Closes elastic#32256
1 parent 0da981a commit 8f8d3a5

File tree

6 files changed

+74
-25
lines changed

6 files changed

+74
-25
lines changed

x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ GET /sensor_rollup/_rollup_search
101101
--------------------------------------------------
102102
// CONSOLE
103103
// TEST[setup:sensor_prefab_data]
104+
// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/]
104105

105106
The query is targeting the `sensor_rollup` data, since this contains the rollup data as configured in the job. A `max`
106107
aggregation has been used on the `temperature` field, yielding the following response:
@@ -194,6 +195,7 @@ GET sensor-1,sensor_rollup/_rollup_search <1>
194195
--------------------------------------------------
195196
// CONSOLE
196197
// TEST[continued]
198+
// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/]
197199
<1> Note the URI now searches `sensor-1` and `sensor_rollup` at the same time
198200

199201
When the search is executed, the Rollup Search endpoint will do two things:

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,23 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis
238238
? (InternalAggregations)liveResponse.getAggregations()
239239
: InternalAggregations.EMPTY;
240240

241-
rolledResponses.forEach(r -> {
242-
if (r == null || r.getAggregations() == null || r.getAggregations().asList().size() == 0) {
243-
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
241+
int missingRollupAggs = rolledResponses.stream().mapToInt(searchResponse -> {
242+
if (searchResponse == null
243+
|| searchResponse.getAggregations() == null
244+
|| searchResponse.getAggregations().asList().size() == 0) {
245+
return 1;
244246
}
245-
});
247+
return 0;
248+
}).sum();
249+
250+
// We had no rollup aggs, so there is nothing to process
251+
if (missingRollupAggs == rolledResponses.size()) {
252+
// Return an empty response, but make sure we include all the shard, failure, etc stats
253+
return mergeFinalResponse(liveResponse, rolledResponses, InternalAggregations.EMPTY);
254+
} else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) {
255+
// We were missing some but not all the aggs, unclear how to handle this. Bail.
256+
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
257+
}
246258

247259
// The combination process returns a tree that is identical to the non-rolled
248260
// which means we can use aggregation's reduce method to combine, just as if
@@ -275,27 +287,39 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis
275287
new InternalAggregation.ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true));
276288
}
277289

278-
// TODO allow profiling in the future
279-
InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), currentTree, null, null,
280-
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
281-
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
282-
rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum());
290+
return mergeFinalResponse(liveResponse, rolledResponses, currentTree);
291+
}
292+
293+
private static SearchResponse mergeFinalResponse(SearchResponse liveResponse, List<SearchResponse> rolledResponses,
294+
InternalAggregations aggs) {
283295

284296
int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum();
285297
int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum();
286298
int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum();
287299
long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ;
288300

301+
boolean isTimedOut = rolledResponses.stream().anyMatch(SearchResponse::isTimedOut);
302+
boolean isTerminatedEarly = rolledResponses.stream()
303+
.filter(r -> r.isTerminatedEarly() != null)
304+
.anyMatch(SearchResponse::isTerminatedEarly);
305+
int numReducePhases = rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum();
306+
289307
if (liveResponse != null) {
290308
totalShards += liveResponse.getTotalShards();
291309
sucessfulShards += liveResponse.getSuccessfulShards();
292310
skippedShards += liveResponse.getSkippedShards();
293311
took = Math.max(took, liveResponse.getTook().getMillis());
312+
isTimedOut = isTimedOut && liveResponse.isTimedOut();
313+
isTerminatedEarly = isTerminatedEarly && liveResponse.isTerminatedEarly();
314+
numReducePhases += liveResponse.getNumReducePhases();
294315
}
295316

317+
InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), aggs, null, null,
318+
isTimedOut, isTerminatedEarly, numReducePhases);
319+
296320
// Shard failures are ignored atm, so returning an empty array is fine
297321
return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards,
298-
took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
322+
took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
299323
}
300324

301325
/**

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,18 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite
155155
rolledSearchSource.size(0);
156156
AggregatorFactories.Builder sourceAgg = request.source().aggregations();
157157

158+
// If there are no aggs in the request, our translation won't create any msearch.
159+
// So just add an dummy request to the msearch and return. This is a bit silly
160+
// but maintains how the regular search API behaves
161+
if (sourceAgg == null || sourceAgg.count() == 0) {
162+
163+
// Note: we can't apply any query rewriting or filtering on the query because there
164+
// are no validated caps, so we have no idea what job is intended here. The only thing
165+
// this affects is doc count, since hits and aggs will both be empty it doesn't really matter.
166+
msearch.add(new SearchRequest(context.getRollupIndices(), request.source()).types(request.types()));
167+
return msearch;
168+
}
169+
158170
// Find our list of "best" job caps
159171
Set<RollupJobCaps> validatedCaps = new HashSet<>();
160172
sourceAgg.getAggregatorFactories()
@@ -248,11 +260,6 @@ static void validateSearchRequest(SearchRequest request) {
248260
if (request.source().explain() != null && request.source().explain()) {
249261
throw new IllegalArgumentException("Rollup search does not support explaining.");
250262
}
251-
252-
// Rollup is only useful if aggregations are set, throw an exception otherwise
253-
if (request.source().aggregations() == null) {
254-
throw new IllegalArgumentException("Rollup requires at least one aggregation to be set.");
255-
}
256263
}
257264

258265
static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCaps) {

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,11 @@ public void testRolledMissingAggs() {
198198
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
199199
ScriptService scriptService = mock(ScriptService.class);
200200

201-
Exception e = expectThrows(RuntimeException.class,
202-
() -> RollupResponseTranslator.combineResponses(msearch,
203-
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
204-
assertThat(e.getMessage(), equalTo("Expected to find aggregations in rollup response, but none found."));
201+
SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
202+
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
203+
assertNotNull(response);
204+
Aggregations responseAggs = response.getAggregations();
205+
assertThat(responseAggs.asList().size(), equalTo(0));
205206
}
206207

207208
public void testMissingRolledIndex() {

x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,21 +307,22 @@ public void testExplain() {
307307
assertThat(e.getMessage(), equalTo("Rollup search does not support explaining."));
308308
}
309309

310-
public void testNoAgg() {
311-
String[] normalIndices = new String[]{randomAlphaOfLength(10)};
310+
public void testNoRollupAgg() {
311+
String[] normalIndices = new String[]{};
312312
String[] rollupIndices = new String[]{randomAlphaOfLength(10)};
313313
TransportRollupSearchAction.RollupSearchContext ctx
314314
= new TransportRollupSearchAction.RollupSearchContext(normalIndices, rollupIndices, Collections.emptySet());
315315
SearchSourceBuilder source = new SearchSourceBuilder();
316316
source.query(new MatchAllQueryBuilder());
317317
source.size(0);
318-
SearchRequest request = new SearchRequest(normalIndices, source);
318+
SearchRequest request = new SearchRequest(rollupIndices, source);
319319
NamedWriteableRegistry registry = mock(NamedWriteableRegistry.class);
320-
Exception e = expectThrows(IllegalArgumentException.class,
321-
() -> TransportRollupSearchAction.createMSearchRequest(request, registry, ctx));
322-
assertThat(e.getMessage(), equalTo("Rollup requires at least one aggregation to be set."));
320+
MultiSearchRequest msearch = TransportRollupSearchAction.createMSearchRequest(request, registry, ctx);
321+
assertThat(msearch.requests().size(), equalTo(1));
322+
assertThat(msearch.requests().get(0), equalTo(request));
323323
}
324324

325+
325326
public void testNoLiveNoRollup() {
326327
String[] normalIndices = new String[0];
327328
String[] rollupIndices = new String[0];

x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,20 @@ setup:
152152
- match: { aggregations.histo.buckets.3.key_as_string: "2017-01-01T08:00:00.000Z" }
153153
- match: { aggregations.histo.buckets.3.doc_count: 20 }
154154

155+
---
156+
"Empty aggregation":
157+
158+
- do:
159+
xpack.rollup.rollup_search:
160+
index: "foo_rollup"
161+
body:
162+
size: 0
163+
aggs: {}
164+
165+
- length: { hits.hits: 0 }
166+
- match: { hits.total: 0 }
167+
- is_false: aggregations
168+
155169

156170
---
157171
"Search with Metric":

0 commit comments

Comments
 (0)