Skip to content

Commit 297e209

Browse files
committed
more improved search result streaming, write a header with shard targets, so each hit just write an id of the targe
1 parent 77564cb commit 297e209

File tree

4 files changed

+92
-22
lines changed

4 files changed

+92
-22
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.lucene.search.TopFieldDocs;
3030
import org.apache.lucene.util.PriorityQueue;
3131
import org.elasticsearch.ElasticSearchIllegalStateException;
32-
import org.elasticsearch.search.SearchHit;
3332
import org.elasticsearch.search.SearchShardTarget;
3433
import org.elasticsearch.search.dfs.AggregatedDfs;
3534
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -190,7 +189,7 @@ public InternalSearchResponse merge(ShardDoc[] sortedDocs, Map<SearchShardTarget
190189
}
191190

192191
// merge hits
193-
List<SearchHit> hits = new ArrayList<SearchHit>();
192+
List<InternalSearchHit> hits = new ArrayList<InternalSearchHit>();
194193
if (!fetchResults.isEmpty()) {
195194
for (ShardDoc shardDoc : sortedDocs) {
196195
FetchSearchResultProvider fetchResultProvider = fetchResults.get(shardDoc.shardTarget());
@@ -199,14 +198,14 @@ public InternalSearchResponse merge(ShardDoc[] sortedDocs, Map<SearchShardTarget
199198
}
200199
FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
201200
int index = fetchResult.counterGetAndIncrement();
202-
if (index < fetchResult.hits().hits().length) {
203-
SearchHit searchHit = fetchResult.hits().hits()[index];
204-
((InternalSearchHit) searchHit).shard(fetchResult.shardTarget());
201+
if (index < fetchResult.hits().internalHits().length) {
202+
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
203+
searchHit.shard(fetchResult.shardTarget());
205204
hits.add(searchHit);
206205
}
207206
}
208207
}
209-
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new SearchHit[hits.size()]), totalHits);
208+
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits);
210209
return new InternalSearchResponse(searchHits, facets);
211210
}
212211
}

modules/elasticsearch/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.lucene.document.FieldSelector;
2626
import org.apache.lucene.document.Fieldable;
2727
import org.elasticsearch.index.mapper.*;
28-
import org.elasticsearch.search.SearchHit;
2928
import org.elasticsearch.search.SearchHitField;
3029
import org.elasticsearch.search.SearchParseElement;
3130
import org.elasticsearch.search.SearchPhase;
@@ -66,7 +65,7 @@ public class FetchPhase implements SearchPhase {
6665
public void execute(SearchContext context) {
6766
FieldSelector fieldSelector = buildFieldSelectors(context);
6867

69-
SearchHit[] hits = new SearchHit[context.docIdsToLoad().length];
68+
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoad().length];
7069
int index = 0;
7170
for (int docId : context.docIdsToLoad()) {
7271
Document doc = loadDocument(context, fieldSelector, docId);

modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.search.highlight.HighlightField;
2929
import org.elasticsearch.util.Nullable;
3030
import org.elasticsearch.util.Unicode;
31+
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
3132
import org.elasticsearch.util.io.stream.StreamInput;
3233
import org.elasticsearch.util.io.stream.StreamOutput;
3334
import org.elasticsearch.util.json.JsonBuilder;
@@ -223,7 +224,17 @@ public static InternalSearchHit readSearchHit(StreamInput in) throws IOException
223224
return hit;
224225
}
225226

227+
public static InternalSearchHit readSearchHit(StreamInput in, @Nullable TIntObjectHashMap<SearchShardTarget> shardLookupMap) throws IOException {
228+
InternalSearchHit hit = new InternalSearchHit();
229+
hit.readFrom(in, shardLookupMap);
230+
return hit;
231+
}
232+
226233
@Override public void readFrom(StreamInput in) throws IOException {
234+
readFrom(in, null);
235+
}
236+
237+
public void readFrom(StreamInput in, @Nullable TIntObjectHashMap<SearchShardTarget> shardLookupMap) throws IOException {
227238
id = in.readUTF();
228239
type = in.readUTF();
229240
int size = in.readVInt();
@@ -301,12 +312,23 @@ public static InternalSearchHit readSearchHit(StreamInput in) throws IOException
301312
highlightFields = builder.build();
302313
}
303314

304-
if (in.readBoolean()) {
305-
shard = readSearchShardTarget(in);
315+
if (shardLookupMap != null) {
316+
int lookupId = in.readVInt();
317+
if (lookupId > 0) {
318+
shard = shardLookupMap.get(lookupId);
319+
}
320+
} else {
321+
if (in.readBoolean()) {
322+
shard = readSearchShardTarget(in);
323+
}
306324
}
307325
}
308326

309327
@Override public void writeTo(StreamOutput out) throws IOException {
328+
writeTo(out, null);
329+
}
330+
331+
public void writeTo(StreamOutput out, @Nullable Map<SearchShardTarget, Integer> shardLookupMap) throws IOException {
310332
out.writeUTF(id);
311333
out.writeUTF(type);
312334
if (source == null) {
@@ -337,11 +359,19 @@ public static InternalSearchHit readSearchHit(StreamInput in) throws IOException
337359
highlightField.writeTo(out);
338360
}
339361
}
340-
if (shard == null) {
341-
out.writeBoolean(false);
362+
if (shardLookupMap == null) {
363+
if (shard == null) {
364+
out.writeBoolean(false);
365+
} else {
366+
out.writeBoolean(true);
367+
shard.writeTo(out);
368+
}
342369
} else {
343-
out.writeBoolean(true);
344-
shard.writeTo(out);
370+
if (shard == null) {
371+
out.writeVInt(0);
372+
} else {
373+
out.writeVInt(shardLookupMap.get(shard));
374+
}
345375
}
346376
}
347377
}

modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,35 @@
2121

2222
import org.elasticsearch.search.SearchHit;
2323
import org.elasticsearch.search.SearchHits;
24+
import org.elasticsearch.search.SearchShardTarget;
25+
import org.elasticsearch.util.gnu.trove.TIntObjectHashMap;
2426
import org.elasticsearch.util.io.stream.StreamInput;
2527
import org.elasticsearch.util.io.stream.StreamOutput;
2628
import org.elasticsearch.util.json.JsonBuilder;
2729

2830
import java.io.IOException;
31+
import java.util.IdentityHashMap;
32+
import java.util.Map;
2933

34+
import static org.elasticsearch.search.SearchShardTarget.*;
3035
import static org.elasticsearch.search.internal.InternalSearchHit.*;
3136

3237
/**
3338
* @author kimchy (shay.banon)
3439
*/
3540
public class InternalSearchHits implements SearchHits {
3641

37-
private static final SearchHit[] EMPTY = new SearchHit[0];
42+
private static final InternalSearchHit[] EMPTY = new InternalSearchHit[0];
3843

39-
private SearchHit[] hits;
44+
private InternalSearchHit[] hits;
4045

4146
private long totalHits;
4247

43-
private InternalSearchHits() {
48+
InternalSearchHits() {
4449

4550
}
4651

47-
public InternalSearchHits(SearchHit[] hits, long totalHits) {
52+
public InternalSearchHits(InternalSearchHit[] hits, long totalHits) {
4853
this.hits = hits;
4954
this.totalHits = totalHits;
5055
}
@@ -57,6 +62,10 @@ public SearchHit[] hits() {
5762
return this.hits;
5863
}
5964

65+
public InternalSearchHit[] internalHits() {
66+
return this.hits;
67+
}
68+
6069
@Override public void toJson(JsonBuilder builder, Params params) throws IOException {
6170
builder.startObject("hits");
6271
builder.field("total", totalHits);
@@ -81,18 +90,51 @@ public static InternalSearchHits readSearchHits(StreamInput in) throws IOExcepti
8190
if (size == 0) {
8291
hits = EMPTY;
8392
} else {
84-
hits = new SearchHit[size];
93+
// read the lookup table first
94+
int lookupSize = in.readVInt();
95+
TIntObjectHashMap<SearchShardTarget> shardLookupMap = null;
96+
if (lookupSize > 0) {
97+
shardLookupMap = new TIntObjectHashMap<SearchShardTarget>(lookupSize);
98+
for (int i = 0; i < lookupSize; i++) {
99+
shardLookupMap.put(in.readVInt(), readSearchShardTarget(in));
100+
}
101+
}
102+
103+
hits = new InternalSearchHit[size];
85104
for (int i = 0; i < hits.length; i++) {
86-
hits[i] = readSearchHit(in);
105+
hits[i] = readSearchHit(in, shardLookupMap);
87106
}
88107
}
89108
}
90109

91110
@Override public void writeTo(StreamOutput out) throws IOException {
92111
out.writeVLong(totalHits);
93112
out.writeVInt(hits.length);
94-
for (SearchHit hit : hits) {
95-
hit.writeTo(out);
113+
if (hits.length > 0) {
114+
// write the header search shard targets (we assume identity equality)
115+
IdentityHashMap<SearchShardTarget, Integer> shardLookupMap = new IdentityHashMap<SearchShardTarget, Integer>();
116+
// start from 1, 0 is for null!
117+
int counter = 1;
118+
// put an entry for null
119+
for (InternalSearchHit hit : hits) {
120+
if (hit.shard() != null) {
121+
Integer handle = shardLookupMap.get(hit.shard());
122+
if (handle == null) {
123+
shardLookupMap.put(hit.shard(), counter++);
124+
}
125+
}
126+
}
127+
out.writeVInt(shardLookupMap.size());
128+
if (!shardLookupMap.isEmpty()) {
129+
for (Map.Entry<SearchShardTarget, Integer> entry : shardLookupMap.entrySet()) {
130+
out.writeVInt(entry.getValue());
131+
entry.getKey().writeTo(out);
132+
}
133+
}
134+
135+
for (InternalSearchHit hit : hits) {
136+
hit.writeTo(out, shardLookupMap.isEmpty() ? null : shardLookupMap);
137+
}
96138
}
97139
}
98140
}

0 commit comments

Comments
 (0)