-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Try to save memory on aggregations #53793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
39d604c
8cddcab
04dba53
6921c8c
76b077d
eb37343
80e31ad
c6162ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.common.io.stream; | ||
|
||
import org.elasticsearch.Version; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
|
||
import java.io.IOException; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* A holder for {@link Writeable}s that can delays reading the underlying | ||
* {@linkplain Writeable} when it is read from a remote node. | ||
*/ | ||
public abstract class DelayableWriteable<T extends Writeable> implements Supplier<T>, Writeable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're only using this for |
||
/** | ||
* Build a {@linkplain DelayableWriteable} that wraps an existing object | ||
* but is serialized so that deserializing it can be delayed. | ||
*/ | ||
public static <T extends Writeable> DelayableWriteable<T> referencing(T reference) { | ||
return new Referencing<>(reference); | ||
} | ||
/** | ||
* Build a {@linkplain DelayableWriteable} that copies a buffer from | ||
* the provided {@linkplain StreamInput} and deserializes the buffer | ||
* when {@link Supplier#get()} is called. | ||
*/ | ||
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException { | ||
return new Delayed<>(reader, in); | ||
} | ||
|
||
private DelayableWriteable() {} | ||
|
||
public abstract boolean isDelayed(); | ||
|
||
private static class Referencing<T extends Writeable> extends DelayableWriteable<T> { | ||
private T reference; | ||
|
||
Referencing(T reference) { | ||
this.reference = reference; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
try (BytesStreamOutput buffer = new BytesStreamOutput()) { | ||
reference.writeTo(buffer); | ||
out.writeBytesReference(buffer.bytes()); | ||
} | ||
} | ||
|
||
@Override | ||
public T get() { | ||
return reference; | ||
} | ||
|
||
@Override | ||
public boolean isDelayed() { | ||
return false; | ||
} | ||
} | ||
|
||
private static class Delayed<T extends Writeable> extends DelayableWriteable<T> { | ||
private final Writeable.Reader<T> reader; | ||
private final Version remoteVersion; | ||
private final BytesReference serialized; | ||
private final NamedWriteableRegistry registry; | ||
|
||
Delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException { | ||
this.reader = reader; | ||
remoteVersion = in.getVersion(); | ||
serialized = in.readBytesReference(); | ||
registry = in.namedWriteableRegistry(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public T get() { | ||
try { | ||
try (StreamInput in = registry == null ? | ||
serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) { | ||
in.setVersion(remoteVersion); | ||
return reader.read(in); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we nullify the bytes ref before returning the deserialized aggs ? We could also protect against multiple calls by keeping the deserialized aggs internally on the first call ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm worried about race conditions with that. The way it is it is fairly simple the look at and say "there are no race conditions." I think nulifying the other references would be good enough from a GC perspective. Do you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep nullifying the reference should be enough but it would be better if we can nullify after each deserialization. Otherwise you'd need to keep the deserialized aggs and their bytes representation during the entire partial reduce which defeats the purpose of saving memories here ? |
||
} | ||
} catch (IOException e) { | ||
throw new RuntimeException("unexpected error expanding aggregations", e); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isDelayed() { | ||
return true; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,13 +22,13 @@ | |
import org.apache.lucene.search.FieldDoc; | ||
import org.apache.lucene.search.TotalHits; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.common.io.stream.DelayableWriteable; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; | ||
import org.elasticsearch.search.DocValueFormat; | ||
import org.elasticsearch.search.SearchPhaseResult; | ||
import org.elasticsearch.search.SearchShardTarget; | ||
import org.elasticsearch.search.aggregations.Aggregations; | ||
import org.elasticsearch.search.aggregations.InternalAggregation; | ||
import org.elasticsearch.search.aggregations.InternalAggregations; | ||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; | ||
|
@@ -40,6 +40,7 @@ | |
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.common.lucene.Lucene.readTopDocs; | ||
|
@@ -54,7 +55,7 @@ public final class QuerySearchResult extends SearchPhaseResult { | |
private TotalHits totalHits; | ||
private float maxScore = Float.NaN; | ||
private DocValueFormat[] sortValueFormats; | ||
private InternalAggregations aggregations; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment here to explain why we use a delayable writable ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ |
||
private DelayableWriteable<InternalAggregations> aggregations; | ||
private boolean hasAggs; | ||
private Suggest suggest; | ||
private boolean searchTimedOut; | ||
|
@@ -196,21 +197,21 @@ public boolean hasAggs() { | |
* Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed. | ||
* @throws IllegalStateException if the aggregations have already been consumed. | ||
*/ | ||
public Aggregations consumeAggs() { | ||
public Supplier<InternalAggregations> consumeAggs() { | ||
if (aggregations == null) { | ||
throw new IllegalStateException("aggs already consumed"); | ||
} | ||
Aggregations aggs = aggregations; | ||
Supplier<InternalAggregations> aggs = aggregations; | ||
aggregations = null; | ||
return aggs; | ||
} | ||
|
||
public void aggregations(InternalAggregations aggregations) { | ||
this.aggregations = aggregations; | ||
this.aggregations = aggregations == null ? null : DelayableWriteable.referencing(aggregations); | ||
hasAggs = aggregations != null; | ||
} | ||
|
||
public InternalAggregations aggregations() { | ||
public DelayableWriteable<InternalAggregations> aggregations() { | ||
return aggregations; | ||
} | ||
|
||
|
@@ -314,18 +315,22 @@ public void readFromWithId(SearchContextId id, StreamInput in) throws IOExceptio | |
} | ||
setTopDocs(readTopDocs(in)); | ||
if (hasAggs = in.readBoolean()) { | ||
aggregations = new InternalAggregations(in); | ||
if (in.getVersion().before(Version.V_8_0_0)) { | ||
aggregations = DelayableWriteable.referencing(new InternalAggregations(in)); | ||
} else { | ||
aggregations = DelayableWriteable.delayed(InternalAggregations::new, in); | ||
} | ||
} | ||
if (in.getVersion().before(Version.V_7_2_0)) { | ||
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() | ||
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); | ||
if (hasAggs && pipelineAggregators.isEmpty() == false) { | ||
List<InternalAggregation> internalAggs = aggregations.asList().stream() | ||
List<InternalAggregation> internalAggs = aggregations.get().asList().stream() | ||
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); | ||
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while | ||
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of | ||
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. | ||
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators); | ||
this.aggregations = DelayableWriteable.referencing(new InternalAggregations(internalAggs, pipelineAggregators)); | ||
} | ||
} | ||
if (in.readBoolean()) { | ||
|
@@ -366,7 +371,11 @@ public void writeToNoId(StreamOutput out) throws IOException { | |
out.writeBoolean(false); | ||
} else { | ||
out.writeBoolean(true); | ||
aggregations.writeTo(out); | ||
if (out.getVersion().before(Version.V_8_0_0)) { | ||
aggregations.get().writeTo(out); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can maybe get the aggs once if the remote node is in a version before v8 (instead of calling get here and below to get the pipeline aggs) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nik9000 ? should we avoid the double deserialization if we need the pipeline aggs below ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Darn it. I twisted the other side around but missed this comment. Of course! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly these are going to be the "referencing" ones anyway. But I'll turn it around. |
||
} else { | ||
aggregations.writeTo(out); | ||
} | ||
} | ||
if (out.getVersion().before(Version.V_7_2_0)) { | ||
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, | ||
|
@@ -375,7 +384,7 @@ public void writeToNoId(StreamOutput out) throws IOException { | |
if (aggregations == null) { | ||
out.writeNamedWriteableList(Collections.emptyList()); | ||
} else { | ||
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators()); | ||
out.writeNamedWriteableList(aggregations.get().getTopLevelPipelineAggregators()); | ||
} | ||
} | ||
if (suggest == null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we nullify the rest of the array to make the reduced aggs eligible for gc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the line right above does that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but we keep the serialized + deserialized form until after the partial reduce. We can try to release the serialized form early with:
Or we can nullify the serialized form when the supplier is called like discussed below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I noticed that right after I sent this. I'm playing with nulling the cell in the array as soon as I call
get
. That feels a little safer than nulling the bytes.