Skip to content

Commit 1ba1b9f

Browse files
authored
EQL: Introduce until functionality (#59292)
Sequences now support until conditional, which prevents a match from occurring if the until matches a document while doing look-ups. Thus a sequence must complete before the until condition matches - if any document within the sequence occurs at, or after, the until hit, the sequence is discarded.
1 parent b174655 commit 1ba1b9f

File tree

14 files changed

+506
-235
lines changed

14 files changed

+506
-235
lines changed

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/CommonEqlActionTestCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.BeforeClass;
2626

2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.Collections;
2930
import java.util.HashSet;
3031
import java.util.List;
@@ -171,7 +172,10 @@ private RestHighLevelClient highLevelClient() {
171172

172173
protected void assertSearchHits(List<SearchHit> events) {
173174
assertNotNull(events);
174-
assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]", spec.expectedEventIds(), extractIds(events));
175+
long[] expected = spec.expectedEventIds();
176+
long[] actual = extractIds(events);
177+
assertArrayEquals("unexpected result for spec: [" + spec.toString() + "]" + Arrays.toString(expected) + " vs " + Arrays.toString(
178+
actual), expected, actual);
175179
}
176180

177181
private static long[] extractIds(List<SearchHit> events) {

x-pack/plugin/eql/qa/common/src/main/resources/test_queries_unsupported.toml

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -345,36 +345,9 @@ process where true
345345
| sort serial_event_id
346346
'''
347347

348-
349348
[[queries]]
350349
name = "fourSequencesByPidWithUntil1"
351350
query = '''
352-
sequence
353-
[process where opcode == 1] by unique_pid
354-
[file where opcode == 0] by unique_pid
355-
[file where opcode == 0] by unique_pid
356-
[file where opcode == 0] by unique_pid
357-
until
358-
[file where opcode == 2] by unique_pid
359-
'''
360-
expected_event_ids = []
361-
362-
[[queries]]
363-
name = "fourSequencesByPidWithUntil2"
364-
query = '''
365-
sequence
366-
[process where opcode == 1] by unique_pid
367-
[file where opcode == 0] by unique_pid
368-
[file where opcode == 0] by unique_pid
369-
[file where opcode == 0] by unique_pid
370-
until
371-
[file where opcode == 200] by unique_pid
372-
'''
373-
expected_event_ids = [54, 55, 61, 67]
374-
375-
[[queries]]
376-
name = "doubleSameSequenceWithByAndFilter"
377-
query = '''
378351
sequence
379352
[file where opcode=0] by unique_pid
380353
[file where opcode=0] by unique_pid
@@ -385,17 +358,6 @@ expected_event_ids = [87, 92]
385358
[[queries]]
386359
name = "doubleSameSequenceWithByUntilAndHead2"
387360
query = '''
388-
sequence
389-
[file where opcode=0 and file_name="*.exe"] by unique_pid
390-
[file where opcode=0 and file_name="*.exe"] by unique_pid
391-
until [process where opcode=1] by unique_ppid
392-
| head 1
393-
'''
394-
expected_event_ids = []
395-
396-
[[queries]]
397-
name = "doubleJoinWithByUntilAndHead"
398-
query = '''
399361
join
400362
[file where opcode=0 and file_name="*.exe"] by unique_pid
401363
[file where opcode=2 and file_name="*.exe"] by unique_pid
@@ -698,29 +660,6 @@ expected_event_ids = [48, 50, 51, 54, 93]
698660
[[queries]]
699661
name = "twoSequencesWithTwoKeysAndUntil"
700662
query = '''
701-
sequence by user_name
702-
[file where opcode=0] by pid,file_path
703-
[file where opcode=2] by pid,file_path
704-
until
705-
[process where opcode == 2] by ppid,process_path
706-
'''
707-
expected_event_ids = []
708-
709-
[[queries]]
710-
name = "twoSequencesWithUntil"
711-
query = '''
712-
sequence by user_name
713-
[file where opcode=0] by pid,file_path
714-
[file where opcode=2] by pid,file_path
715-
until
716-
[process where opcode == 5] by ppid,process_path
717-
| head 2
718-
'''
719-
expected_event_ids = [55, 59, 61, 65]
720-
721-
[[queries]]
722-
name = "twoSequencesWithHead"
723-
query = '''
724663
join by user_name
725664
[file where true] by pid,file_path
726665
[process where true] by ppid,process_path

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public BoxedQueryRequest from(Ordinal begin) {
7575
return this;
7676
}
7777

78+
public Ordinal after() {
79+
return after;
80+
}
81+
7882
public Ordinal from() {
7983
return from;
8084
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/KeyAndOrdinal.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ public class KeyAndOrdinal {
2020
this.ordinal = ordinal;
2121
}
2222

23+
public SequenceKey key() {
24+
return key;
25+
}
26+
27+
public Ordinal ordinal() {
28+
return ordinal;
29+
}
30+
2331
@Override
2432
public int hashCode() {
2533
return Objects.hash(key, ordinal);

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Matcher.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
*/
66
package org.elasticsearch.xpack.eql.execution.assembler;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.elasticsearch.common.collect.Tuple;
911
import org.elasticsearch.common.unit.TimeValue;
1012
import org.elasticsearch.search.SearchHit;
1113
import org.elasticsearch.xpack.eql.execution.search.Limit;
12-
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
1314
import org.elasticsearch.xpack.eql.execution.sequence.Sequence;
1415
import org.elasticsearch.xpack.eql.execution.sequence.SequenceStateMachine;
1516
import org.elasticsearch.xpack.eql.session.Payload;
@@ -21,6 +22,8 @@
2122
*/
2223
class Matcher {
2324

25+
private final Logger log = LogManager.getLogger(Matcher.class);
26+
2427
// NB: just like in a list, this represents the total number of stages yet counting starts at 0
2528
private final SequenceStateMachine stateMachine;
2629
private final int numberOfStages;
@@ -48,27 +51,33 @@ boolean match(int stage, Iterable<Tuple<KeyAndOrdinal, SearchHit>> hits) {
4851
// early skip in case of reaching the limit
4952
// check the last stage to avoid calling the state machine in other stages
5053
if (stateMachine.reachedLimit()) {
54+
log.trace("Limit reached {}", stateMachine.stats());
5155
return false;
5256
}
5357
}
5458
}
59+
log.trace("{}", stateMachine.stats());
5560
return true;
5661
}
5762

58-
boolean until(Iterable<Ordinal> markers) {
59-
// no-op so far
60-
61-
return false;
63+
void until(Iterable<KeyAndOrdinal> markers) {
64+
stateMachine.until(markers);
6265
}
6366

64-
public boolean hasCandidates(int stage) {
67+
boolean hasCandidates(int stage) {
6568
return stateMachine.hasCandidates(stage);
6669
}
6770

71+
void dropUntil() {
72+
stateMachine.dropUntil();
73+
}
74+
6875
Payload payload(long startTime) {
6976
List<Sequence> completed = stateMachine.completeSequences();
7077
TimeValue tookTime = new TimeValue(System.currentTimeMillis() - startTime);
71-
return new SequencePayload(completed, false, tookTime);
78+
Payload p = new SequencePayload(completed, false, tookTime);
79+
stateMachine.clear();
80+
return p;
7281
}
7382

7483
@Override

0 commit comments

Comments
 (0)