Skip to content

Commit 2f389a7

Browse files
authored
EQL: Introduce sequencing fetch size (#59063)
The current internal sequence algorithm relies on fetching multiple results and then paginating through the dataset. Depending on the dataset and memory, setting a larger page size can yield better performance at the expense of memory. This PR makes this behavior explicit by decoupling the fetch size from size, the maximum number of results desired. As such, use in testing a minimum fetch size which exposed a number of bugs: Jumping across data across queries causing valid data to be seen as a gap. Incorrectly resuming searching across pages (again causing data to be discarded). which have been addressed.
1 parent 509568b commit 2f389a7

File tree

23 files changed

+277
-121
lines changed

23 files changed

+277
-121
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
4343
private String implicitJoinKeyField = "agent.id";
4444
private boolean isCaseSensitive = true;
4545

46-
private int fetchSize = 50;
46+
private int size = 10;
47+
private int fetchSize = 1000;
4748
private SearchAfterBuilder searchAfterBuilder;
4849
private String query;
4950
private String tiebreakerField;
@@ -60,6 +61,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
6061
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
6162
static final String KEY_CASE_SENSITIVE = "case_sensitive";
6263
static final String KEY_SIZE = "size";
64+
static final String KEY_FETCH_SIZE = "fetch_size";
6365
static final String KEY_SEARCH_AFTER = "search_after";
6466
static final String KEY_QUERY = "query";
6567
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
@@ -85,7 +87,8 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
8587
if (implicitJoinKeyField != null) {
8688
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
8789
}
88-
builder.field(KEY_SIZE, fetchSize());
90+
builder.field(KEY_SIZE, size());
91+
builder.field(KEY_FETCH_SIZE, fetchSize());
8992

9093
if (searchAfterBuilder != null) {
9194
builder.array(KEY_SEARCH_AFTER, searchAfterBuilder.getSortValues());
@@ -172,14 +175,26 @@ public EqlSearchRequest implicitJoinKeyField(String implicitJoinKeyField) {
172175
return this;
173176
}
174177

178+
public int size() {
179+
return this.size;
180+
}
181+
182+
public EqlSearchRequest size(int size) {
183+
this.size = size;
184+
if (fetchSize <= 0) {
185+
throw new IllegalArgumentException("size must be greater than 0");
186+
}
187+
return this;
188+
}
189+
175190
public int fetchSize() {
176191
return this.fetchSize;
177192
}
178193

179194
public EqlSearchRequest fetchSize(int size) {
180195
this.fetchSize = size;
181-
if (fetchSize <= 0) {
182-
throw new IllegalArgumentException("size must be greater than 0");
196+
if (fetchSize < 2) {
197+
throw new IllegalArgumentException("fetch size must be greater than 1");
183198
}
184199
return this;
185200
}
@@ -246,7 +261,8 @@ public boolean equals(Object o) {
246261
return false;
247262
}
248263
EqlSearchRequest that = (EqlSearchRequest) o;
249-
return fetchSize == that.fetchSize &&
264+
return size == that.size &&
265+
fetchSize == that.fetchSize &&
250266
Arrays.equals(indices, that.indices) &&
251267
Objects.equals(indicesOptions, that.indicesOptions) &&
252268
Objects.equals(filter, that.filter) &&
@@ -268,6 +284,7 @@ public int hashCode() {
268284
Arrays.hashCode(indices),
269285
indicesOptions,
270286
filter,
287+
size,
271288
fetchSize,
272289
timestampField,
273290
tiebreakerField,

client/rest-high-level/src/test/java/org/elasticsearch/client/EqlIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void assertResponse(EqlSearchResponse response, int count) {
103103

104104
public void testBasicSearch() throws Exception {
105105
EqlClient eql = highLevelClient().eql();
106-
EqlSearchRequest request = new EqlSearchRequest("index", "process where true");
106+
EqlSearchRequest request = new EqlSearchRequest("index", "process where true").size(RECORD_COUNT);
107107
assertResponse(execute(request, eql::search, eql::searchAsync), RECORD_COUNT);
108108
}
109109

@@ -115,7 +115,7 @@ public void testSimpleConditionSearch() throws Exception {
115115
EqlSearchRequest request = new EqlSearchRequest("index", "foo where pid > 0");
116116

117117
// test with non-default event.category mapping
118-
request.eventCategoryField("event_type");
118+
request.eventCategoryField("event_type").size(RECORD_COUNT);
119119

120120
EqlSearchResponse response = execute(request, eql::search, eql::searchAsync);
121121
assertResponse(response, RECORD_COUNT / DIVIDER);

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java

+3
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ protected EqlSearchResponse runQuery(String index, String query, boolean isCaseS
144144
EqlSearchRequest request = new EqlSearchRequest(testIndexName, query);
145145
request.isCaseSensitive(isCaseSensitive);
146146
request.tiebreakerField("event.sequence");
147+
// some queries return more than 10 results
148+
request.size(50);
149+
request.fetchSize(2);
147150
return eqlClient().search(request, RequestOptions.DEFAULT);
148151
}
149152

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java

+32-9
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.function.Supplier;
3333

3434
import static org.elasticsearch.action.ValidateActions.addValidationError;
35-
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FETCH_SIZE;
3635
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_EVENT_CATEGORY;
3736
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_IMPLICIT_JOIN_KEY;
3837
import static org.elasticsearch.xpack.eql.action.RequestDefaults.FIELD_TIMESTAMP;
@@ -51,7 +50,8 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
5150
private String tiebreakerField = null;
5251
private String eventCategoryField = FIELD_EVENT_CATEGORY;
5352
private String implicitJoinKeyField = FIELD_IMPLICIT_JOIN_KEY;
54-
private int fetchSize = FETCH_SIZE;
53+
private int size = RequestDefaults.SIZE;
54+
private int fetchSize = RequestDefaults.FETCH_SIZE;
5555
private SearchAfterBuilder searchAfterBuilder;
5656
private String query;
5757
private boolean isCaseSensitive = false;
@@ -67,6 +67,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
6767
static final String KEY_EVENT_CATEGORY_FIELD = "event_category_field";
6868
static final String KEY_IMPLICIT_JOIN_KEY_FIELD = "implicit_join_key_field";
6969
static final String KEY_SIZE = "size";
70+
static final String KEY_FETCH_SIZE = "fetch_size";
7071
static final String KEY_SEARCH_AFTER = "search_after";
7172
static final String KEY_QUERY = "query";
7273
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
@@ -80,6 +81,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
8081
static final ParseField EVENT_CATEGORY_FIELD = new ParseField(KEY_EVENT_CATEGORY_FIELD);
8182
static final ParseField IMPLICIT_JOIN_KEY_FIELD = new ParseField(KEY_IMPLICIT_JOIN_KEY_FIELD);
8283
static final ParseField SIZE = new ParseField(KEY_SIZE);
84+
static final ParseField FETCH_SIZE = new ParseField(KEY_FETCH_SIZE);
8385
static final ParseField SEARCH_AFTER = new ParseField(KEY_SEARCH_AFTER);
8486
static final ParseField QUERY = new ParseField(KEY_QUERY);
8587
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
@@ -102,6 +104,7 @@ public EqlSearchRequest(StreamInput in) throws IOException {
102104
tiebreakerField = in.readOptionalString();
103105
eventCategoryField = in.readString();
104106
implicitJoinKeyField = in.readString();
107+
size = in.readVInt();
105108
fetchSize = in.readVInt();
106109
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
107110
query = in.readString();
@@ -148,10 +151,14 @@ public ActionRequestValidationException validate() {
148151
validationException = addValidationError("implicit join key field is null or empty", validationException);
149152
}
150153

151-
if (fetchSize <= 0) {
154+
if (size <= 0) {
152155
validationException = addValidationError("size must be greater than 0", validationException);
153156
}
154157

158+
if (fetchSize < 2) {
159+
validationException = addValidationError("fetch size must be greater than 1", validationException);
160+
}
161+
155162
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEP_ALIVE) {
156163
validationException =
157164
addValidationError("[keep_alive] must be greater than 1 minute, got:" + keepAlive.toString(), validationException);
@@ -173,7 +180,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
173180
if (implicitJoinKeyField != null) {
174181
builder.field(KEY_IMPLICIT_JOIN_KEY_FIELD, implicitJoinKeyField());
175182
}
176-
builder.field(KEY_SIZE, fetchSize());
183+
builder.field(KEY_SIZE, size());
184+
builder.field(KEY_FETCH_SIZE, fetchSize());
177185

178186
if (searchAfterBuilder != null) {
179187
builder.array(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
@@ -204,7 +212,8 @@ protected static <R extends EqlSearchRequest> ObjectParser<R, Void> objectParser
204212
parser.declareString(EqlSearchRequest::tiebreakerField, TIEBREAKER_FIELD);
205213
parser.declareString(EqlSearchRequest::eventCategoryField, EVENT_CATEGORY_FIELD);
206214
parser.declareString(EqlSearchRequest::implicitJoinKeyField, IMPLICIT_JOIN_KEY_FIELD);
207-
parser.declareInt(EqlSearchRequest::fetchSize, SIZE);
215+
parser.declareInt(EqlSearchRequest::size, SIZE);
216+
parser.declareInt(EqlSearchRequest::fetchSize, FETCH_SIZE);
208217
parser.declareField(EqlSearchRequest::setSearchAfter, SearchAfterBuilder::fromXContent, SEARCH_AFTER,
209218
ObjectParser.ValueType.OBJECT_ARRAY);
210219
parser.declareString(EqlSearchRequest::query, QUERY);
@@ -259,10 +268,21 @@ public EqlSearchRequest implicitJoinKeyField(String implicitJoinKeyField) {
259268
return this;
260269
}
261270

262-
public int fetchSize() { return this.fetchSize; }
271+
public int size() {
272+
return this.size;
273+
}
274+
275+
public EqlSearchRequest size(int size) {
276+
this.size = size;
277+
return this;
278+
}
279+
280+
public int fetchSize() {
281+
return this.fetchSize;
282+
}
263283

264-
public EqlSearchRequest fetchSize(int size) {
265-
this.fetchSize = size;
284+
public EqlSearchRequest fetchSize(int fetchSize) {
285+
this.fetchSize = fetchSize;
266286
return this;
267287
}
268288

@@ -334,6 +354,7 @@ public void writeTo(StreamOutput out) throws IOException {
334354
out.writeOptionalString(tiebreakerField);
335355
out.writeString(eventCategoryField);
336356
out.writeString(implicitJoinKeyField);
357+
out.writeVInt(size);
337358
out.writeVInt(fetchSize);
338359
out.writeOptionalWriteable(searchAfterBuilder);
339360
out.writeString(query);
@@ -354,7 +375,8 @@ public boolean equals(Object o) {
354375
return false;
355376
}
356377
EqlSearchRequest that = (EqlSearchRequest) o;
357-
return fetchSize == that.fetchSize &&
378+
return size == that.size &&
379+
fetchSize == that.fetchSize &&
358380
Arrays.equals(indices, that.indices) &&
359381
Objects.equals(indicesOptions, that.indicesOptions) &&
360382
Objects.equals(filter, that.filter) &&
@@ -375,6 +397,7 @@ public int hashCode() {
375397
Arrays.hashCode(indices),
376398
indicesOptions,
377399
filter,
400+
size,
378401
fetchSize,
379402
timestampField,
380403
tiebreakerField,

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequestBuilder.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,13 @@ public EqlSearchRequestBuilder implicitJoinKeyField(String implicitJoinKeyField)
4545
return this;
4646
}
4747

48-
public EqlSearchRequestBuilder fetchSize(int size) {
49-
request.fetchSize(size);
48+
public EqlSearchRequestBuilder size(int size) {
49+
request.size(size);
50+
return this;
51+
}
52+
53+
public EqlSearchRequestBuilder fetchSize(int fetchSize) {
54+
request.fetchSize(fetchSize);
5055
return this;
5156
}
5257

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/RequestDefaults.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ private RequestDefaults() {}
1414
public static final String FIELD_EVENT_CATEGORY = "event.category";
1515
public static final String FIELD_IMPLICIT_JOIN_KEY = "agent.id";
1616

17-
public static int FETCH_SIZE = 10;
17+
public static int SIZE = 10;
18+
public static int FETCH_SIZE = 1000;
1819
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java

+45-11
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,25 @@
1515
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
1616
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
1717

18+
/**
19+
* Ranged or boxed query. Provides a beginning or end to the current query.
20+
* The query moves between them through search_after.
21+
*
22+
* Note that the range is not set at once on purpose since each query tends to have
23+
* its own number of results separate from the others.
24+
* As such, each query starts where it lefts to reach the current in-progress window
25+
* as oppose to always operating with the exact same window.
26+
*/
1827
public class BoxedQueryRequest implements QueryRequest {
1928

2029
private final RangeQueryBuilder timestampRange;
2130
private final RangeQueryBuilder tiebreakerRange;
2231

2332
private final SearchSourceBuilder searchSource;
2433

34+
private Ordinal from, to;
35+
private Ordinal after;
36+
2537
public BoxedQueryRequest(QueryRequest original, String timestamp, String tiebreaker) {
2638
searchSource = original.searchSource();
2739

@@ -44,28 +56,50 @@ public SearchSourceBuilder searchSource() {
4456
}
4557

4658
@Override
47-
public void next(Ordinal ordinal) {
48-
// reset existing constraints
49-
timestampRange.gte(null).lte(null);
50-
if (tiebreakerRange != null) {
51-
tiebreakerRange.gte(null).lte(null);
52-
}
59+
public void nextAfter(Ordinal ordinal) {
60+
after = ordinal;
5361
// and leave only search_after
5462
searchSource.searchAfter(ordinal.toArray());
5563
}
5664

57-
public BoxedQueryRequest between(Ordinal begin, Ordinal end) {
58-
timestampRange.gte(begin.timestamp()).lte(end.timestamp());
59-
65+
/**
66+
* Sets the lower boundary for the query (non-inclusive).
67+
* Can be removed (when the query in unbounded) through null.
68+
*/
69+
public BoxedQueryRequest from(Ordinal begin) {
70+
from = begin;
6071
if (tiebreakerRange != null) {
61-
tiebreakerRange.gte(begin.tiebreaker()).lte(end.tiebreaker());
72+
timestampRange.gte(begin != null ? begin.timestamp() : null);
73+
tiebreakerRange.gt(begin != null ? begin.tiebreaker() : null);
74+
} else {
75+
timestampRange.gt(begin != null ? begin.timestamp() : null);
6276
}
77+
return this;
78+
}
79+
80+
public Ordinal from() {
81+
return from;
82+
}
6383

84+
/**
85+
* Sets the upper boundary for the query (inclusive).
86+
* Can be removed (when the query in unbounded) through null.
87+
*/
88+
public BoxedQueryRequest to(Ordinal end) {
89+
to = end;
90+
timestampRange.lte(end != null ? end.timestamp() : null);
91+
if (tiebreakerRange != null) {
92+
tiebreakerRange.lte(end != null ? end.tiebreaker() : null);
93+
}
6494
return this;
6595
}
6696

6797
@Override
6898
public String toString() {
69-
return searchSource.toString();
99+
return "( " + string(from) + " >-" + string(after) + "-> " + string(to) + "]";
100+
}
101+
102+
private static String string(Ordinal o) {
103+
return o != null ? o.toString() : "<none>";
70104
}
71105
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java

+5
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,9 @@ public Ordinal ordinal(SearchHit hit) {
8686
}
8787
return new Ordinal(timestamp, tbreaker);
8888
}
89+
90+
@Override
91+
public String toString() {
92+
return "[" + stage + "][" + reverse + "]";
93+
}
8994
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
6969
if (query instanceof EsQueryExec) {
7070
QueryRequest original = ((EsQueryExec) query).queryRequest(session);
7171

72+
// increase the request size based on the fetch size (since size is applied already through limit)
73+
7274
BoxedQueryRequest boxedRequest = new BoxedQueryRequest(original, timestampName, tiebreakerName);
7375
Criterion<BoxedQueryRequest> criterion =
74-
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i> 0 && descending);
76+
new Criterion<>(i, boxedRequest, keyExtractors, tsExtractor, tbExtractor, i > 0 && descending);
7577
criteria.add(criterion);
7678
} else {
7779
// until

0 commit comments

Comments
 (0)