-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Change the default batched_reduce_size of search requests #51857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Pinging @elastic/es-analytics-geo (:Analytics/Aggregations) |
Pinging @elastic/es-search (:Search/Search) |
Wow yeah, 512 is high :) I like the idea of triggering a reduce based on a timeout, but that seems potentially tricky to get right. And as you said, reductions should be pretty fast, so a lowish 5-50 default seems reasonable to me. |
I like the idea of just decreasing it to 5 or 10. It's very simple and most requests that hit a single index will keep doing a single reduction. There are some downsides too but the benefits outweigh downsides in my opinion. |
This change optimizes the merge of terms aggregations by removing the priority queue that was used to collect all the buckets during a non-final reduction. We don't need to keep the result sorted since the merge of buckets in a subsequent reduce can modify the order. I wrote a small micro-benchmark to test the change and the speed ups are significative for small merge buffer sizes: ```` ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 2459,690 ± 198,682 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 1030,620 ± 91,544 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 558,608 ± 44,915 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 287,333 ± 8,342 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 257,325 ± 54,515 ms/op ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 805,611 ± 14,630 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 378,851 ± 17,929 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 261,094 ± 10,176 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 241,051 ± 19,558 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 231,643 ± 6,170 ms/op ```` The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster: ```` ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 12791,083 ± 397,128 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 3974,939 ± 324,617 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 2186,285 ± 267,124 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 914,657 ± 160,784 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 604,198 ± 145,457 ms/op ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 60696,107 ± 929,944 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 16292,894 ± 783,398 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 7705,444 ± 77,588 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 2156,685 ± 88,795 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 760,273 ± 53,738 ms/op ```` The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up. Relates elastic#51857
This change optimizes the merge of terms aggregations by removing the priority queue that was used to collect all the buckets during a non-final reduction. We don't need to keep the result sorted since the merge of buckets in a subsequent reduce can modify the order. I wrote a small micro-benchmark to test the change and the speed ups are significative for small merge buffer sizes: ```` ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 2459,690 ± 198,682 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 1030,620 ± 91,544 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 558,608 ± 44,915 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 287,333 ± 8,342 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 257,325 ± 54,515 ms/op ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 805,611 ± 14,630 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 378,851 ± 17,929 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 261,094 ± 10,176 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 241,051 ± 19,558 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 231,643 ± 6,170 ms/op ```` The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster: ```` ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 12791,083 ± 397,128 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 3974,939 ± 324,617 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 2186,285 ± 267,124 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 914,657 ± 160,784 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 604,198 ± 145,457 ms/op ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 60696,107 ± 929,944 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 16292,894 ± 783,398 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 7705,444 ± 77,588 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 2156,685 ± 88,795 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 760,273 ± 53,738 ms/op ```` The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up. Relates #51857
This change optimizes the merge of terms aggregations by removing the priority queue that was used to collect all the buckets during a non-final reduction. We don't need to keep the result sorted since the merge of buckets in a subsequent reduce can modify the order. I wrote a small micro-benchmark to test the change and the speed ups are significative for small merge buffer sizes: ```` ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 2459,690 ± 198,682 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 1030,620 ± 91,544 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 558,608 ± 44,915 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 287,333 ± 8,342 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 257,325 ± 54,515 ms/op ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 10000 1000 1000 avgt 10 805,611 ± 14,630 ms/op TermsReduceBenchmark.reduceTopHits 16 10000 1000 1000 avgt 10 378,851 ± 17,929 ms/op TermsReduceBenchmark.reduceTopHits 32 10000 1000 1000 avgt 10 261,094 ± 10,176 ms/op TermsReduceBenchmark.reduceTopHits 128 10000 1000 1000 avgt 10 241,051 ± 19,558 ms/op TermsReduceBenchmark.reduceTopHits 512 10000 1000 1000 avgt 10 231,643 ± 6,170 ms/op ```` The code for the benchmark can be found [here](). It seems to be up to 3x faster for terms aggregations that return 10,000 unique terms (1000 terms per shard). For a cardinality of 100,000 terms, this patch is up to 5x faster: ```` ########## Patch: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 12791,083 ± 397,128 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 3974,939 ± 324,617 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 2186,285 ± 267,124 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 914,657 ± 160,784 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 604,198 ± 145,457 ms/op ########## Master: Benchmark (bufferSize) (cardinality) (numShards) (topNSize) Mode Cnt Score Error Units TermsReduceBenchmark.reduceTopHits 5 100000 1000 1000 avgt 10 60696,107 ± 929,944 ms/op TermsReduceBenchmark.reduceTopHits 16 100000 1000 1000 avgt 10 16292,894 ± 783,398 ms/op TermsReduceBenchmark.reduceTopHits 32 100000 1000 1000 avgt 10 7705,444 ± 77,588 ms/op TermsReduceBenchmark.reduceTopHits 128 100000 1000 1000 avgt 10 2156,685 ± 88,795 ms/op TermsReduceBenchmark.reduceTopHits 512 100000 1000 1000 avgt 10 760,273 ± 53,738 ms/op ```` The merge of buckets can also be optimized. Currently we use an hash map to merge buckets coming from different shards so this can be costly if the number of unique terms is high. Instead, we could always sort the shard terms result by key and perform a merge sort to reduce the results. This would save memory and make the merge more linear in terms of complexity in the coordinating node at the expense of an additional sort in the shards. I plan to test this possible optimization in a follow up. Relates #51857
We've made several progress on this issue recently so apologies for not updating earlier. While discussing the proposed change offline, we've decided to start with benchmarks to evaluate the effect of changing the default value.
In order to be able to effectively reduce the default value without impacting users, we think that more work is needed:
Once these tasks are completed we will resume the discussion with the additional informations in order to take a decision based on real measurements. |
I wonder if the number should be different for searches with aggs vs searches without them. Aggs are the most expensive thing to reduce but they are also the biggest thing to keep around. |
My inclination would be to try hard to have the same number all the time. I've been hit by moving parts like that a couple times when digging performance issues. |
Is there any plans to support parallel reduce on coordinator. |
I test a huge aggs case : if batched size = 5, the query's took is extremely high compare to default batched size parameters |
The plan is to move to a thread pool in order to limit the number of partial reduce we perform in parallel. So the synchronization will be removed but we'll continue to limit the number of partial reduce executed in parallel for a single search request to 1.
We're making improvements to the partial reduce but this is expected to be slower. We didn't settle the default yet and 5 might be too small. We'll test the performance before doing the change but first we need to cleanup the synchronization and move the operation into a thread pool. Maybe you can share your results for comparison ? |
@jimczi Mem stats
Search stats
It seems like the more reduce count cause to more mem cost |
I have a similar patch for this. |
This change forks the execution of partial reduces in the coordinating node to the search thread pool. It also ensures that partial reduces are executed sequentially and asynchronously in order to limit the memory and cpu that a single search request can use but also to avoid blocking a network thread. If a partial reduce fails with an exception, the search request is cancelled and the reporting of the error is delayed to the start of the fetch phase (when the final reduce is performed). This ensures that we cleanup the in-flight search requests before returning an error to the user. Closes elastic#53411 Relates elastic#51857
This change forks the execution of partial reduces in the coordinating node to the search thread pool. It also ensures that partial reduces are executed sequentially and asynchronously in order to limit the memory and cpu that a single search request can use but also to avoid blocking a network thread. If a partial reduce fails with an exception, the search request is cancelled and the reporting of the error is delayed to the start of the fetch phase (when the final reduce is performed). This ensures that we cleanup the in-flight search requests before returning an error to the user. Closes #53411 Relates #51857
This change forks the execution of partial reduces in the coordinating node to the search thread pool. It also ensures that partial reduces are executed sequentially and asynchronously in order to limit the memory and cpu that a single search request can use but also to avoid blocking a network thread. If a partial reduce fails with an exception, the search request is cancelled and the reporting of the error is delayed to the start of the fetch phase (when the final reduce is performed). This ensures that we cleanup the in-flight search requests before returning an error to the user. Closes #53411 Relates #51857
Today, the terms aggregation reduces multiple aggregations at once using a map to group same buckets together. This operation can be costly since it requires to lookup every bucket in a global map with no particular order. This commit changes how term buckets are sorted by shards and partial reduces in order to be able to reduce results using a merge-sort strategy. For bwc, results are merged with the legacy code if any of the aggregations use a different sort (if it was returned by a node in prior versions). Relates elastic#51857
* Improve reduction of terms aggregations Today, the terms aggregation reduces multiple aggregations at once using a map to group same buckets together. This operation can be costly since it requires to lookup every bucket in a global map with no particular order. This commit changes how term buckets are sorted by shards and partial reduces in order to be able to reduce results using a merge-sort strategy. For bwc, results are merged with the legacy code if any of the aggregations use a different sort (if it was returned by a node in prior versions). Relates #51857
Today, the terms aggregation reduces multiple aggregations at once using a map to group same buckets together. This operation can be costly since it requires to lookup every bucket in a global map with no particular order. This commit changes how term buckets are sorted by shards and partial reduces in order to be able to reduce results using a merge-sort strategy. For bwc, results are merged with the legacy code if any of the aggregations use a different sort (if it was returned by a node in prior versions). Relates #51857
This commit allows coordinating node to account the memory used to perform partial and final reduce of aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException} is thrown if exceeds the maximum memory allowed in this breaker. This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced. This estimation can be completely off for some aggregations but it is corrected with the real size after the reduce completes. If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations and replace the estimation with the serialized size of the newly reduced result. As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is to [reduce the default batch reduce size](elastic#51857) of blocking search request to a more sane number. Closes elastic#37182
This commit allows coordinating node to account the memory used to perform partial and final reduce of aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException} is thrown if exceeds the maximum memory allowed in this breaker. This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced. This estimation can be completely off for some aggregations but it is corrected with the real size after the reduce completes. If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations and replace the estimation with the serialized size of the newly reduced result. As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is to [reduce the default batch reduce size](#51857) of blocking search request to a more sane number. Closes #37182
This commit allows coordinating node to account the memory used to perform partial and final reduce of aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException} is thrown if exceeds the maximum memory allowed in this breaker. This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced. This estimation can be completely off for some aggregations but it is corrected with the real size after the reduce completes. If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations and replace the estimation with the serialized size of the newly reduced result. As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is to [reduce the default batch reduce size](#51857) of blocking search request to a more sane number. Closes #37182
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
With #121885 , each data node performs incremental reduction. For now
|
Just a side note, I'll get to this shortly in earnest: I think the batched_reduce_size will become just a UX knob shortly. I have a working prototype that (using the existing code for both and just removing layers really) just does streaming result merging. There shouldn't be any more tradeoffs here between CPU and memory in the near term and we can decide what makes sense from the user interface perspective. Maybe there it's not so much about a batch size but more about a time/the frequency of new results becoming visible? |
I think that should be fixed, and that's made easier by batched execution: every time a data node comes back with results. I wish that that side of the knob is no longer needed. As for how often to perform partial reduction on data node, I also wish we could remove the knob, but I am not super sure we can. |
We actually can :) It's quite easy too, sorry I'm slow at opening a PR for this because of all the other things that are ongoing but the situation boils down to:
but far more importantly for this discussion, check out the That was pretty much my plan here, time permitting :) |
Today we execute a partial reduce of search requests after we buffered at least
512
shard search results. The default, users can change this value withbatched_reduce_size=N
, seems quite high and can cause memory issue for queries that target a large amount of shards. We also want to use the partial reduce to speed up the search on subsequent search shard request (#51852) but users won't see the benefit unless they reduce the batched reduce size explicitly. Partial (and final) reduce are usually very fast so I am opening this issue to use a sane default that could save memories on coordinating node and speed up sorted queries on time-based indices (queries that can target a lot of shards). We have plenty of options so here's a non-exhaustive list:I am curious to hear your thoughts on these options.
The text was updated successfully, but these errors were encountered: