Skip to content

Consolidate DelayableWriteable #55932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 30, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public final class SearchPhaseController {
Expand Down Expand Up @@ -437,7 +436,7 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
List<Supplier<InternalAggregations>> bufferedAggs,
List<DelayableWriteable<InternalAggregations>> bufferedAggs,
List<TopDocs> bufferedTopDocs,
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
Expand All @@ -462,7 +461,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs;
final List<Supplier<InternalAggregations>> aggregationsList;
final List<DelayableWriteable<InternalAggregations>> aggregationsList;
if (bufferedAggs != null) {
consumeAggs = false;
// we already have results from intermediate reduces and just need to perform the final reduce
Expand Down Expand Up @@ -527,18 +526,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

private InternalAggregations reduceAggs(
private static InternalAggregations reduceAggs(
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
boolean performFinalReduce,
List<? extends Supplier<InternalAggregations>> aggregationsList
List<DelayableWriteable<InternalAggregations>> aggregationsList
) {
/*
* Parse the aggregations, clearing the list as we go so bits backing
* the DelayedWriteable can be collected immediately.
*/
List<InternalAggregations> toReduce = new ArrayList<>(aggregationsList.size());
for (int i = 0; i < aggregationsList.size(); i++) {
toReduce.add(aggregationsList.get(i).get());
toReduce.add(aggregationsList.get(i).expand());
aggregationsList.set(i, null);
}
return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce,
Expand Down Expand Up @@ -701,7 +700,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (hasAggs) {
List<InternalAggregations> aggs = new ArrayList<>(aggsBuffer.length);
for (int i = 0; i < aggsBuffer.length; i++) {
aggs.add(aggsBuffer[i].get());
aggs.add(aggsBuffer[i].expand());
aggsBuffer[i] = null; // null the buffer so it can be GCed now.
}
InternalAggregations reduced =
Expand Down Expand Up @@ -743,8 +742,8 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}

private synchronized List<Supplier<InternalAggregations>> getRemainingAggs() {
return hasAggs ? Arrays.asList((Supplier<InternalAggregations>[]) aggsBuffer).subList(0, index) : null;
private synchronized List<DelayableWriteable<InternalAggregations>> getRemainingAggs() {
return hasAggs ? Arrays.asList((DelayableWriteable<InternalAggregations>[]) aggsBuffer).subList(0, index) : null;
}

private synchronized List<TopDocs> getRemainingTopDocs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.util.function.Supplier;

/**
* A holder for {@link Writeable}s that can delays reading the underlying
* {@linkplain Writeable} when it is read from a remote node.
* A holder for {@link Writeable}s that delays reading the underlying object
* on the receiving end. To be used for objects whose deserialized
* representation is inefficient to keep in memory compared to their
* corresponding serialized representation.
* The node that produces the {@link Writeable} calls {@link #referencing(Writeable)}
* to create a {@link DelayableWriteable} that serializes the inner object
* first to a buffer and writes the content of the buffer to the {@link StreamOutput}.
* The receiver node calls {@link #delayed(Reader, StreamInput)} to create a
* {@link DelayableWriteable} that reads the buffer from the @link {@link StreamInput}
* but delays creating the actual object by calling {@link #expand()} when needed.
* Multiple {@link DelayableWriteable}s coming from different nodes may be buffered
* on the receiver end, which may hold a mix of {@link DelayableWriteable}s that were
* produced locally (hence expanded) as well as received form another node (hence subject
* to delayed expansion). When such objects are buffered for some time it may be desirable
* to force their buffering in serialized format by calling
* {@link #asSerialized(Reader, NamedWriteableRegistry)}.
*/
public abstract class DelayableWriteable<T extends Writeable> implements Supplier<T>, Writeable {
public abstract class DelayableWriteable<T extends Writeable> implements Writeable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, is it just DelayedWriteable or DelayedReadWriteable ? Because it's not that you can choose to delay its reading, it's the only way that it works?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so. I figured Writeable was the name for things that we threw across the wire and calling it Delayed was enough to imply that it delayed reading because that was sort of the only thing you could delay.

At this point, given how we use it, it might be more correct to call it PotentiallySerializedWriteable. But that is kind of mouthful. I think the whole "Delayed" bit of the name was perfect for the first PR that built it but it is less and less perfect now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why Delayed VS Delayable for instance?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the referencing implementation isn't delayed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that that is a lie :) because writeTo will always do the two steps serialization that will allow for delaying the deserialization on the other side. The point is that referencing is only on the producer side hence you never read into referencing from the stream? Maybe a bit of a philosophical discussion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename it to something like SerializedReference or MaybeSerializedReference so maybe its not an issue then. I get where you are coming from though. The name is icky now.

/**
* Build a {@linkplain DelayableWriteable} that wraps an existing object
* but is serialized so that deserializing it can be delayed.
Expand All @@ -42,7 +55,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(T referenc
/**
* Build a {@linkplain DelayableWriteable} that copies a buffer from
* the provided {@linkplain StreamInput} and deserializes the buffer
* when {@link Supplier#get()} is called.
* when {@link #expand()} is called.
*/
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename this to readDelayedWriteable ? also depending on the rename proposed above...

return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference());
Expand All @@ -56,16 +69,21 @@ private DelayableWriteable() {}
*/
public abstract Serialized<T> asSerialized(Writeable.Reader<T> reader, NamedWriteableRegistry registry);

/**
* Expands the inner {@link Writeable} to its original representation and returns it
*/
public abstract T expand();

/**
* {@code true} if the {@linkplain Writeable} is being stored in
* serialized form, {@code false} otherwise.
*/
abstract boolean isSerialized();

private static class Referencing<T extends Writeable> extends DelayableWriteable<T> {
private T reference;
private final T reference;

Referencing(T reference) {
private Referencing(T reference) {
this.reference = reference;
}

Expand All @@ -75,17 +93,19 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public T get() {
public T expand() {
return reference;
}

@Override
public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry registry) {
BytesStreamOutput buffer;
try {
return new Serialized<T>(reader, Version.CURRENT, registry, writeToBuffer(Version.CURRENT).bytes());
buffer = writeToBuffer(Version.CURRENT);
} catch (IOException e) {
throw new RuntimeException("unexpected error expanding aggregations", e);
throw new RuntimeException("unexpected error writing writeable to buffer", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change!

}
return new Serialized<>(reader, Version.CURRENT, registry, buffer.bytes());
}

@Override
Expand All @@ -111,8 +131,8 @@ public static class Serialized<T extends Writeable> extends DelayableWriteable<T
private final NamedWriteableRegistry registry;
private final BytesReference serialized;

Serialized(Writeable.Reader<T> reader, Version serializedAtVersion,
NamedWriteableRegistry registry, BytesReference serialized) throws IOException {
private Serialized(Writeable.Reader<T> reader, Version serializedAtVersion,
NamedWriteableRegistry registry, BytesReference serialized) {
this.reader = reader;
this.serializedAtVersion = serializedAtVersion;
this.registry = registry;
Expand All @@ -136,20 +156,20 @@ public void writeTo(StreamOutput out) throws IOException {
* differences in the wire protocol. This ain't efficient but
* it should be quite rare.
*/
referencing(get()).writeTo(out);
referencing(expand()).writeTo(out);
}
}

@Override
public T get() {
public T expand() {
try {
try (StreamInput in = registry == null ?
serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, is it an option to keep a reference to the original stream input so that we do not need this if and we can avoid exposing the new namedWriteableRegistry getter in StreamInput?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could keep a reference to the original StreamInput but we'd still need to read the serialized bytes from it.

I wonder if keeping a reference to the StreamInput would keep a reference to its underlying storage. If it did then it'd probably be bad to keep the reference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it did sound risky to me as well, and what we do instead looks fine to me.

in.setVersion(serializedAtVersion);
return reader.read(in);
}
} catch (IOException e) {
throw new RuntimeException("unexpected error expanding aggregations", e);
throw new RuntimeException("unexpected error expanding serialized delayed writeable", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public <T extends Exception> T readException() throws IOException {
}

/**
* Get the registry of named writeables is his stream has one,
* Get the registry of named writeables if this stream has one,
* {@code null} otherwise.
*/
public NamedWriteableRegistry namedWriteableRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IO
public void testSerializesWithRemoteVersion() throws IOException {
Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
DelayableWriteable<SneakOtherSideVersionOnWire> original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire());
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion));
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).expand().version, equalTo(remoteVersion));
}

public void testAsSerializedIsNoopOnSerialized() throws IOException {
Expand All @@ -172,7 +172,7 @@ public void testAsSerializedIsNoopOnSerialized() throws IOException {
private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException {
DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT);
assertTrue(roundTripped.isSerialized());
assertThat(roundTripped.get(), equalTo(original.get()));
assertThat(roundTripped.expand(), equalTo(original.expand()));
}

private <T extends Writeable> DelayableWriteable<T> roundTrip(DelayableWriteable<T> original,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void testSerialization() throws Exception {
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs().get();
Aggregations deserializedAggs = deserialized.consumeAggs().get();
Aggregations aggs = querySearchResult.consumeAggs().expand();
Aggregations deserializedAggs = deserialized.consumeAggs().expand();
assertEquals(aggs.asList(), deserializedAggs.asList());
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,18 +379,16 @@ public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits,
reducedAggs = () -> null;
} else {
/*
* Keep a reference to the serialiazed form of the partially
* Keep a reference to the serialized form of the partially
* reduced aggs and reduce it on the fly when someone asks
* for it. This will produce right-ish aggs. Much more right
* than if you don't do the final reduce. Its important that
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @nik9000 I removed the uncertainty around producing right-ish aggs. I find that if certain aggs don't work it's a bug that we should fix like we did with scripted metric. The approach here around partial aggs is the correct one hence we should not have doubts about it :)

* we wait until someone needs the result so we don't perform
* the final reduce only to throw it away. And it is important
* that we kep the reference to the serialized aggrgations
* because the SearchPhaseController *already* has that
* reference so we're not creating more garbage.
* for it. It's important that we wait until someone needs
* the result so we don't perform the final reduce only to
* throw it away. And it is important that we keep the reference
* to the serialized aggregations because SearchPhaseController
* *already* has that reference so we're not creating more garbage.
*/
reducedAggs = () ->
InternalAggregations.topLevelReduce(singletonList(aggregations.get()), aggReduceContextSupplier.get());
InternalAggregations.topLevelReduce(singletonList(aggregations.expand()), aggReduceContextSupplier.get());
}
searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class MutableSearchResponse {
private int reducePhase;
/**
* The response produced by the search API. Once we receive it we stop
* building our own {@linkplain SearchResponse}s when you get the status
* and instead return this.
* building our own {@linkplain SearchResponse}s when get async search
* is called, and instead return this.
* @see #findOrBuildResponse(AsyncSearchTask)
*/
private SearchResponse finalResponse;
private ElasticsearchException failure;
Expand Down Expand Up @@ -157,10 +158,9 @@ private SearchResponse findOrBuildResponse(AsyncSearchTask task) {
/*
* Build the response, reducing aggs if we haven't already and
* storing the result of the reduction so we won't have to reduce
* a second time if you get the response again and nothing has
* changed. This does cost memory because we have a reference
* to the reduced aggs sitting around so it can't be GCed until
* we get an update.
* the same aggregation results a second time if nothing has changed.
* This does cost memory because we have a reference to the finally
* reduced aggs sitting around which can't be GCed until we get an update.
*/
InternalAggregations reducedAggs = reducedAggsSource.get();
reducedAggsSource = () -> reducedAggs;
Expand All @@ -183,8 +183,6 @@ synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTas
return resp;
}



private void failIfFrozen() {
if (frozen) {
throw new IllegalStateException("invalid update received after the completion of the request");
Expand Down