-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Request-level circuit breaker support on coordinating nodes #62223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request-level circuit breaker support on coordinating nodes #62223
Conversation
This commit allows coordinating node to account the memory used to perform partial and final reduce of aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException} is thrown if exceeds the maximum memory allowed in this breaker. This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced. This estimation can be completely off for some aggregations but it is corrected with the real size after the reduce completes. If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations and replace the estimation with the serialized size of the newly reduced result. As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is to [reduce the default batch reduce size](elastic#51857) of blocking search request to a more sane number. Closes elastic#37182
Pinging @elastic/es-core-infra (:Core/Infra/Circuit Breakers) |
Pinging @elastic/es-analytics-geo (:Analytics/Geo) |
@@ -1,177 +0,0 @@ | |||
/* | |||
* Licensed to Elasticsearch under one or more contributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewer: I moved these tests in https://github.com/elastic/elasticsearch/pull/62223/files#diff-0721d29fdc234c0f88a9019057ea55bd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bit of a shame that these go from single node tests to full blown IT tests, what is the reasoning behind this choice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I regrouped the search action tests in a single IT class. I agree that these tests may not require the full IT but they are grouped with other tests that require it so I thought that it makes sense to move them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks right to me. I'll have to go over it more closely before 👍 it and I think I'd like to wait a day just to have fresh eyes on it.
This commit removes the serialization of partial reduce in order to speed up the merges when the batch reduce size is smaller than the number of shards in the request. The estimation of the size of partial reduce is still based on the binary size (serialized form) but we keep the full java object and estimate the size with a counting stream output. Finally this change adds a benchmark for the reduce of nested terms aggs. This benchmark was used to optimize the code in this PR.
We discussed offline with @nik9000 and I pushed some changes to speed up the partial merge. First of all I removed the serialization of the partial reduce and replace it with an estimation of the size based on a noop-serialization (just counting the bytes). That resulted in much better performance for large cardinality aggregations and allows to estimate more precisely the memory used by the final reduce. I added the benchmark that I used in the PR to be able to replay the numbers.
The first result is the time it takes to reduce with a batch reduce size of 512, the second result is when we serialize the results of partial aggs with a batch reduce size of 32 and the last one is when we don't serialize partial results. As you can see the speedups are significant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some small things but LGTM.
It's a shame about reserializing being slow.
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java
Outdated
Show resolved
Hide resolved
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java
Show resolved
Hide resolved
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a couple of small questions
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
Show resolved
Hide resolved
assertThat(response.get().getFailure().getCause(), instanceOf(IllegalArgumentException.class)); | ||
assertEquals("Unknown NamedWriteable category [" + InternalAggregation.class.getName() + "]", | ||
response.get().getFailure().getCause().getMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this test no longer relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it cannot work anymore since we don't need to serialize the aggs. I think it's ok since we have other tests that check that exception thrown during a partial/final reduce are handled correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, you mean the condition that the test had to trigger the failure, which was around serialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
@@ -1,177 +0,0 @@ | |||
/* | |||
* Licensed to Elasticsearch under one or more contributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bit of a shame that these go from single node tests to full blown IT tests, what is the reasoning behind this choice?
Sorry not to comment publicly about the serialization change! It makes me sad not to serialize but I see the reasoning. |
@elasticmachine run elasticsearch-ci/2 |
Ensures that the test always run with a memory circuit breaker. Relates #62223
This commit allows coordinating node to account the memory used to perform partial and final reduce of
aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save
and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final
reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException is thrown
if exceeds the maximum memory allowed in this breaker.
This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced.
This estimation can be completely off for some aggregations but it is corrected with the real size after
the reduce completes.
If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations
and replace the estimation with the serialized size of the newly reduced result.
As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead
of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is
to reduce the default batch reduce size of blocking
search request to a more sane number.
Closes #37182