Skip to content

Commit fc6422f

Browse files
committed
Consolidate DelayableWriteable (#55932)
This commit includes a number of minor improvements around `DelayableWriteable`: javadocs were expanded and reworded, `get` was renamed to `expand` and `DelayableWriteable` no longer implements `Supplier`. Also a couple of methods are now private instead of package private.
1 parent c36bcb4 commit fc6422f

File tree

8 files changed

+63
-48
lines changed

8 files changed

+63
-48
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import java.util.Map;
7272
import java.util.function.Function;
7373
import java.util.function.IntFunction;
74-
import java.util.function.Supplier;
7574
import java.util.stream.Collectors;
7675

7776
public final class SearchPhaseController {
@@ -437,7 +436,7 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
437436
* @see QuerySearchResult#consumeProfileResult()
438437
*/
439438
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
440-
List<Supplier<InternalAggregations>> bufferedAggs,
439+
List<DelayableWriteable<InternalAggregations>> bufferedAggs,
441440
List<TopDocs> bufferedTopDocs,
442441
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest,
443442
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
@@ -462,7 +461,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
462461
final boolean hasSuggest = firstResult.suggest() != null;
463462
final boolean hasProfileResults = firstResult.hasProfileResults();
464463
final boolean consumeAggs;
465-
final List<Supplier<InternalAggregations>> aggregationsList;
464+
final List<DelayableWriteable<InternalAggregations>> aggregationsList;
466465
if (bufferedAggs != null) {
467466
consumeAggs = false;
468467
// we already have results from intermediate reduces and just need to perform the final reduce
@@ -527,18 +526,18 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
527526
firstResult.sortValueFormats(), numReducePhases, size, from, false);
528527
}
529528

530-
private InternalAggregations reduceAggs(
529+
private static InternalAggregations reduceAggs(
531530
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
532531
boolean performFinalReduce,
533-
List<? extends Supplier<InternalAggregations>> aggregationsList
532+
List<DelayableWriteable<InternalAggregations>> aggregationsList
534533
) {
535534
/*
536535
* Parse the aggregations, clearing the list as we go so bits backing
537536
* the DelayedWriteable can be collected immediately.
538537
*/
539538
List<InternalAggregations> toReduce = new ArrayList<>(aggregationsList.size());
540539
for (int i = 0; i < aggregationsList.size(); i++) {
541-
toReduce.add(aggregationsList.get(i).get());
540+
toReduce.add(aggregationsList.get(i).expand());
542541
aggregationsList.set(i, null);
543542
}
544543
return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce,
@@ -701,7 +700,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
701700
if (hasAggs) {
702701
List<InternalAggregations> aggs = new ArrayList<>(aggsBuffer.length);
703702
for (int i = 0; i < aggsBuffer.length; i++) {
704-
aggs.add(aggsBuffer[i].get());
703+
aggs.add(aggsBuffer[i].expand());
705704
aggsBuffer[i] = null; // null the buffer so it can be GCed now.
706705
}
707706
InternalAggregations reduced =
@@ -743,8 +742,8 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
743742
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
744743
}
745744

746-
private synchronized List<Supplier<InternalAggregations>> getRemainingAggs() {
747-
return hasAggs ? Arrays.asList((Supplier<InternalAggregations>[]) aggsBuffer).subList(0, index) : null;
745+
private synchronized List<DelayableWriteable<InternalAggregations>> getRemainingAggs() {
746+
return hasAggs ? Arrays.asList((DelayableWriteable<InternalAggregations>[]) aggsBuffer).subList(0, index) : null;
748747
}
749748

750749
private synchronized List<TopDocs> getRemainingTopDocs() {

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,26 @@
2525
import org.elasticsearch.common.bytes.BytesReference;
2626

2727
import java.io.IOException;
28-
import java.util.function.Supplier;
2928

3029
/**
31-
* A holder for {@link Writeable}s that can delays reading the underlying
32-
* {@linkplain Writeable} when it is read from a remote node.
30+
* A holder for {@link Writeable}s that delays reading the underlying object
31+
* on the receiving end. To be used for objects whose deserialized
32+
* representation is inefficient to keep in memory compared to their
33+
* corresponding serialized representation.
34+
* The node that produces the {@link Writeable} calls {@link #referencing(Writeable)}
35+
* to create a {@link DelayableWriteable} that serializes the inner object
36+
* first to a buffer and writes the content of the buffer to the {@link StreamOutput}.
37+
* The receiver node calls {@link #delayed(Reader, StreamInput)} to create a
38+
* {@link DelayableWriteable} that reads the buffer from the @link {@link StreamInput}
39+
* but delays creating the actual object by calling {@link #expand()} when needed.
40+
* Multiple {@link DelayableWriteable}s coming from different nodes may be buffered
41+
* on the receiver end, which may hold a mix of {@link DelayableWriteable}s that were
42+
* produced locally (hence expanded) as well as received form another node (hence subject
43+
* to delayed expansion). When such objects are buffered for some time it may be desirable
44+
* to force their buffering in serialized format by calling
45+
* {@link #asSerialized(Reader, NamedWriteableRegistry)}.
3346
*/
34-
public abstract class DelayableWriteable<T extends Writeable> implements Supplier<T>, Writeable {
47+
public abstract class DelayableWriteable<T extends Writeable> implements Writeable {
3548
/**
3649
* Build a {@linkplain DelayableWriteable} that wraps an existing object
3750
* but is serialized so that deserializing it can be delayed.
@@ -42,7 +55,7 @@ public static <T extends Writeable> DelayableWriteable<T> referencing(T referenc
4255
/**
4356
* Build a {@linkplain DelayableWriteable} that copies a buffer from
4457
* the provided {@linkplain StreamInput} and deserializes the buffer
45-
* when {@link Supplier#get()} is called.
58+
* when {@link #expand()} is called.
4659
*/
4760
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
4861
return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference());
@@ -56,16 +69,21 @@ private DelayableWriteable() {}
5669
*/
5770
public abstract Serialized<T> asSerialized(Writeable.Reader<T> reader, NamedWriteableRegistry registry);
5871

72+
/**
73+
* Expands the inner {@link Writeable} to its original representation and returns it
74+
*/
75+
public abstract T expand();
76+
5977
/**
6078
* {@code true} if the {@linkplain Writeable} is being stored in
6179
* serialized form, {@code false} otherwise.
6280
*/
6381
abstract boolean isSerialized();
6482

6583
private static class Referencing<T extends Writeable> extends DelayableWriteable<T> {
66-
private T reference;
84+
private final T reference;
6785

68-
Referencing(T reference) {
86+
private Referencing(T reference) {
6987
this.reference = reference;
7088
}
7189

@@ -75,17 +93,19 @@ public void writeTo(StreamOutput out) throws IOException {
7593
}
7694

7795
@Override
78-
public T get() {
96+
public T expand() {
7997
return reference;
8098
}
8199

82100
@Override
83101
public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry registry) {
102+
BytesStreamOutput buffer;
84103
try {
85-
return new Serialized<T>(reader, Version.CURRENT, registry, writeToBuffer(Version.CURRENT).bytes());
104+
buffer = writeToBuffer(Version.CURRENT);
86105
} catch (IOException e) {
87-
throw new RuntimeException("unexpected error expanding aggregations", e);
106+
throw new RuntimeException("unexpected error writing writeable to buffer", e);
88107
}
108+
return new Serialized<>(reader, Version.CURRENT, registry, buffer.bytes());
89109
}
90110

91111
@Override
@@ -111,8 +131,8 @@ public static class Serialized<T extends Writeable> extends DelayableWriteable<T
111131
private final NamedWriteableRegistry registry;
112132
private final BytesReference serialized;
113133

114-
Serialized(Writeable.Reader<T> reader, Version serializedAtVersion,
115-
NamedWriteableRegistry registry, BytesReference serialized) throws IOException {
134+
private Serialized(Writeable.Reader<T> reader, Version serializedAtVersion,
135+
NamedWriteableRegistry registry, BytesReference serialized) {
116136
this.reader = reader;
117137
this.serializedAtVersion = serializedAtVersion;
118138
this.registry = registry;
@@ -136,20 +156,20 @@ public void writeTo(StreamOutput out) throws IOException {
136156
* differences in the wire protocol. This ain't efficient but
137157
* it should be quite rare.
138158
*/
139-
referencing(get()).writeTo(out);
159+
referencing(expand()).writeTo(out);
140160
}
141161
}
142162

143163
@Override
144-
public T get() {
164+
public T expand() {
145165
try {
146166
try (StreamInput in = registry == null ?
147167
serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) {
148168
in.setVersion(serializedAtVersion);
149169
return reader.read(in);
150170
}
151171
} catch (IOException e) {
152-
throw new RuntimeException("unexpected error expanding aggregations", e);
172+
throw new RuntimeException("unexpected error expanding serialized delayed writeable", e);
153173
}
154174
}
155175

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,7 @@ public <T extends Exception> T readException() throws IOException {
11031103
}
11041104

11051105
/**
1106-
* Get the registry of named writeables is his stream has one,
1106+
* Get the registry of named writeables if this stream has one,
11071107
* {@code null} otherwise.
11081108
*/
11091109
public NamedWriteableRegistry namedWriteableRegistry() {

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ public void writeToNoId(StreamOutput out) throws IOException {
386386
} else {
387387
out.writeBoolean(true);
388388
if (out.getVersion().before(Version.V_7_7_0)) {
389-
InternalAggregations aggs = aggregations.get();
389+
InternalAggregations aggs = aggregations.expand();
390390
aggs.writeTo(out);
391391
if (out.getVersion().before(Version.V_7_2_0)) {
392392
/*

server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IO
159159
public void testSerializesWithRemoteVersion() throws IOException {
160160
Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
161161
DelayableWriteable<SneakOtherSideVersionOnWire> original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire());
162-
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion));
162+
assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).expand().version, equalTo(remoteVersion));
163163
}
164164

165165
public void testAsSerializedIsNoopOnSerialized() throws IOException {
@@ -172,7 +172,7 @@ public void testAsSerializedIsNoopOnSerialized() throws IOException {
172172
private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException {
173173
DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT);
174174
assertTrue(roundTripped.isSerialized());
175-
assertThat(roundTripped.get(), equalTo(original.get()));
175+
assertThat(roundTripped.expand(), equalTo(original.expand()));
176176
}
177177

178178
private <T extends Writeable> DelayableWriteable<T> roundTrip(DelayableWriteable<T> original,

server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ public void testSerialization() throws Exception {
8787
assertEquals(querySearchResult.size(), deserialized.size());
8888
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
8989
if (deserialized.hasAggs()) {
90-
Aggregations aggs = querySearchResult.consumeAggs().get();
91-
Aggregations deserializedAggs = deserialized.consumeAggs().get();
90+
Aggregations aggs = querySearchResult.consumeAggs().expand();
91+
Aggregations deserializedAggs = deserialized.consumeAggs().expand();
9292
assertEquals(aggs.asList(), deserializedAggs.asList());
9393
}
9494
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
@@ -114,7 +114,7 @@ public void testReadFromPre_7_1_0() throws IOException {
114114
QuerySearchResult querySearchResult = new QuerySearchResult(in);
115115
assertEquals(100, querySearchResult.getContextId().getId());
116116
assertTrue(querySearchResult.hasAggs());
117-
InternalAggregations aggs = querySearchResult.consumeAggs().get();
117+
InternalAggregations aggs = querySearchResult.consumeAggs().expand();
118118
assertEquals(1, aggs.asList().size());
119119
// We deserialize and throw away top level pipeline aggs
120120
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -379,18 +379,16 @@ public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits,
379379
reducedAggs = () -> null;
380380
} else {
381381
/*
382-
* Keep a reference to the serialiazed form of the partially
382+
* Keep a reference to the serialized form of the partially
383383
* reduced aggs and reduce it on the fly when someone asks
384-
* for it. This will produce right-ish aggs. Much more right
385-
* than if you don't do the final reduce. Its important that
386-
* we wait until someone needs the result so we don't perform
387-
* the final reduce only to throw it away. And it is important
388-
* that we kep the reference to the serialized aggrgations
389-
* because the SearchPhaseController *already* has that
390-
* reference so we're not creating more garbage.
384+
* for it. It's important that we wait until someone needs
385+
* the result so we don't perform the final reduce only to
386+
* throw it away. And it is important that we keep the reference
387+
* to the serialized aggregations because SearchPhaseController
388+
* *already* has that reference so we're not creating more garbage.
391389
*/
392390
reducedAggs = () ->
393-
InternalAggregations.topLevelReduce(singletonList(aggregations.get()), aggReduceContextSupplier.get());
391+
InternalAggregations.topLevelReduce(singletonList(aggregations.expand()), aggReduceContextSupplier.get());
394392
}
395393
searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase);
396394
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ class MutableSearchResponse {
5050
private int reducePhase;
5151
/**
5252
* The response produced by the search API. Once we receive it we stop
53-
* building our own {@linkplain SearchResponse}s when you get the status
54-
* and instead return this.
53+
* building our own {@linkplain SearchResponse}s when get async search
54+
* is called, and instead return this.
55+
* @see #findOrBuildResponse(AsyncSearchTask)
5556
*/
5657
private SearchResponse finalResponse;
5758
private ElasticsearchException failure;
@@ -157,10 +158,9 @@ private SearchResponse findOrBuildResponse(AsyncSearchTask task) {
157158
/*
158159
* Build the response, reducing aggs if we haven't already and
159160
* storing the result of the reduction so we won't have to reduce
160-
* a second time if you get the response again and nothing has
161-
* changed. This does cost memory because we have a reference
162-
* to the reduced aggs sitting around so it can't be GCed until
163-
* we get an update.
161+
* the same aggregation results a second time if nothing has changed.
162+
* This does cost memory because we have a reference to the finally
163+
* reduced aggs sitting around which can't be GCed until we get an update.
164164
*/
165165
InternalAggregations reducedAggs = reducedAggsSource.get();
166166
reducedAggsSource = () -> reducedAggs;
@@ -183,8 +183,6 @@ synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTas
183183
return resp;
184184
}
185185

186-
187-
188186
private void failIfFrozen() {
189187
if (frozen) {
190188
throw new IllegalStateException("invalid update received after the completion of the request");

0 commit comments

Comments
 (0)