Skip to content

Commit a607577

Browse files
committed
EQL: Fix aggressive/incorrect until policy in sequences (#65156)
The current until implementation in sequences is too optimistic, leading to an aggressive match that discards correct data leading to invalid results. This commit addresses this issue and also unifies the until usage inside TumblingWindow. Further more it packs together the UntilGroup with SequenceGroup to minimize memory usage and improve clean-up. (cherry picked from commit de2724e)
1 parent a77412d commit a607577

File tree

8 files changed

+197
-179
lines changed

8 files changed

+197
-179
lines changed

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Arrays;
3434
import java.util.Collections;
3535
import java.util.List;
36+
import java.util.StringJoiner;
3637

3738
import static java.util.stream.Collectors.toList;
3839

@@ -142,14 +143,29 @@ protected EqlClient eqlClient() {
142143

143144
protected void assertEvents(List<Event> events) {
144145
assertNotNull(events);
145-
logger.info("Events {}", events);
146+
logger.debug("Events {}", new Object() {
147+
public String toString() {
148+
return eventsToString(events);
149+
}
150+
});
151+
146152
long[] expected = eventIds;
147153
long[] actual = extractIds(events);
148154
assertArrayEquals(LoggerMessageFormat.format(null, "unexpected result for spec[{}] [{}] -> {} vs {}", name, query, Arrays.toString(
149155
expected), Arrays.toString(actual)),
150156
expected, actual);
151157
}
152158

159+
private String eventsToString(List<Event> events) {
160+
StringJoiner sj = new StringJoiner(",", "[", "]");
161+
for (Event event : events) {
162+
sj.add(event.id() + "|" + event.index());
163+
sj.add(event.sourceAsMap().toString());
164+
sj.add("\n");
165+
}
166+
return sj.toString();
167+
}
168+
153169
@SuppressWarnings("unchecked")
154170
private long[] extractIds(List<Event> events) {
155171
final int len = events.size();

x-pack/plugin/eql/qa/correctness/src/javaRestTest/resources/queries.toml

+73-74
Large diffs are not rendered by default.

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

+70-58
Original file line numberDiff line numberDiff line change
@@ -9,116 +9,128 @@
99
import org.elasticsearch.common.logging.LoggerMessageFormat;
1010
import org.elasticsearch.xpack.eql.execution.search.Ordinal;
1111

12+
import java.util.Iterator;
1213
import java.util.LinkedHashMap;
1314
import java.util.Map;
14-
import java.util.Map.Entry;
1515

1616
/**
1717
* Dedicated collection for mapping a key to a list of sequences
1818
* The list represents the sequence for each stage (based on its index) and is fixed in size
1919
*/
2020
class KeyToSequences {
2121

22+
/**
23+
* Utility class holding the sequencegroup/until tuple that also handles
24+
* lazy initialization.
25+
*/
26+
private class SequenceEntry {
27+
28+
private final SequenceGroup[] groups;
29+
// created lazily
30+
private UntilGroup until;
31+
32+
SequenceEntry(int stages) {
33+
this.groups = new SequenceGroup[stages];
34+
}
35+
36+
void add(int stage, Sequence sequence) {
37+
// create the group on demand
38+
if (groups[stage] == null) {
39+
groups[stage] = new SequenceGroup();
40+
}
41+
groups[stage].add(sequence);
42+
}
43+
44+
public void remove(int stage) {
45+
groups[stage] = null;
46+
}
47+
48+
void until(Ordinal ordinal) {
49+
if (until == null) {
50+
until = new UntilGroup();
51+
}
52+
until.add(ordinal);
53+
}
54+
}
55+
2256
private final int listSize;
2357
/** for each key, associate the frame per state (determined by index) */
24-
private final Map<SequenceKey, SequenceGroup[]> keyToSequences;
25-
private final Map<SequenceKey, UntilGroup> keyToUntil;
58+
private final Map<SequenceKey, SequenceEntry> keyToSequences;
2659

2760
KeyToSequences(int listSize) {
2861
this.listSize = listSize;
2962
this.keyToSequences = new LinkedHashMap<>();
30-
this.keyToUntil = new LinkedHashMap<>();
31-
}
32-
33-
private SequenceGroup[] groups(SequenceKey key) {
34-
return keyToSequences.computeIfAbsent(key, k -> new SequenceGroup[listSize]);
3563
}
3664

3765
SequenceGroup groupIfPresent(int stage, SequenceKey key) {
38-
SequenceGroup[] groups = keyToSequences.get(key);
39-
return groups == null ? null : groups[stage];
66+
SequenceEntry sequenceEntry = keyToSequences.get(key);
67+
return sequenceEntry == null ? null : sequenceEntry.groups[stage];
4068
}
4169

4270
UntilGroup untilIfPresent(SequenceKey key) {
43-
return keyToUntil.get(key);
71+
SequenceEntry sequenceEntry = keyToSequences.get(key);
72+
return sequenceEntry == null ? null : sequenceEntry.until;
4473
}
4574

4675
void add(int stage, Sequence sequence) {
4776
SequenceKey key = sequence.key();
48-
SequenceGroup[] groups = groups(key);
49-
// create the group on demand
50-
if (groups[stage] == null) {
51-
groups[stage] = new SequenceGroup(key);
52-
}
53-
groups[stage].add(sequence);
77+
SequenceEntry info = keyToSequences.computeIfAbsent(key, k -> new SequenceEntry(listSize));
78+
info.add(stage, sequence);
5479
}
5580

5681
void until(Iterable<KeyAndOrdinal> until) {
5782
for (KeyAndOrdinal keyAndOrdinal : until) {
5883
// ignore unknown keys
5984
SequenceKey key = keyAndOrdinal.key();
60-
if (keyToSequences.containsKey(key)) {
61-
UntilGroup group = keyToUntil.computeIfAbsent(key, UntilGroup::new);
62-
group.add(keyAndOrdinal);
85+
SequenceEntry sequenceEntry = keyToSequences.get(key);
86+
if (sequenceEntry != null) {
87+
sequenceEntry.until(keyAndOrdinal.ordinal);
6388
}
6489
}
6590
}
6691

67-
void remove(int stage, SequenceGroup group) {
68-
SequenceKey key = group.key();
69-
SequenceGroup[] groups = keyToSequences.get(key);
70-
groups[stage] = null;
71-
// clean-up the key if all groups are empty
72-
boolean shouldRemoveKey = true;
73-
for (SequenceGroup gp : groups) {
74-
if (gp != null && gp.isEmpty() == false) {
75-
shouldRemoveKey = false;
76-
break;
77-
}
78-
}
79-
if (shouldRemoveKey) {
80-
keyToSequences.remove(key);
81-
}
82-
}
83-
84-
void dropUntil() {
85-
// clean-up all candidates that occur before until
86-
for (Entry<SequenceKey, UntilGroup> entry : keyToUntil.entrySet()) {
87-
SequenceGroup[] groups = keyToSequences.get(entry.getKey());
88-
if (groups != null) {
89-
for (Ordinal o : entry.getValue()) {
90-
for (SequenceGroup group : groups) {
91-
if (group != null) {
92-
group.trimBefore(o);
93-
}
94-
}
95-
}
96-
}
97-
}
98-
99-
keyToUntil.clear();
92+
void remove(int stage, SequenceKey key) {
93+
SequenceEntry info = keyToSequences.get(key);
94+
info.remove(stage);
10095
}
10196

10297
/**
10398
* Remove all matches expect the latest.
10499
*/
105100
void trimToTail() {
106-
for (SequenceGroup[] groups : keyToSequences.values()) {
107-
for (SequenceGroup group : groups) {
101+
for (Iterator<SequenceEntry> it = keyToSequences.values().iterator(); it.hasNext(); ) {
102+
SequenceEntry seqs = it.next();
103+
// first remove the sequences
104+
// and remember the last item from the first
105+
// initialized stage to be used with until
106+
Sequence firstTail = null;
107+
for (SequenceGroup group : seqs.groups) {
108108
if (group != null) {
109-
group.trimToLast();
109+
Sequence sequence = group.trimToLast();
110+
if (firstTail == null) {
111+
firstTail = sequence;
112+
}
113+
}
114+
}
115+
// there are no sequences on any stage for this key, drop it
116+
if (firstTail == null) {
117+
it.remove();
118+
} else {
119+
// drop any possible UNTIL that occurs before the last tail
120+
UntilGroup until = seqs.until;
121+
if (until != null) {
122+
until.trimBefore(firstTail.ordinal());
110123
}
111124
}
112125
}
113126
}
114127

115128
public void clear() {
116129
keyToSequences.clear();
117-
keyToUntil.clear();
118130
}
119131

120132
@Override
121133
public String toString() {
122-
return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
134+
return LoggerMessageFormat.format(null, "Keys=[{}]", keyToSequences.size());
123135
}
124136
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/OrdinalGroup.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
*/
2323
abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
2424

25-
private final SequenceKey key;
2625
private final Function<E, Ordinal> extractor;
2726

2827
// NB: since the size varies significantly, use a LinkedList
@@ -32,15 +31,10 @@ abstract class OrdinalGroup<E> implements Iterable<Ordinal> {
3231

3332
private Ordinal start, stop;
3433

35-
protected OrdinalGroup(SequenceKey key, Function<E, Ordinal> extractor) {
36-
this.key = key;
34+
protected OrdinalGroup(Function<E, Ordinal> extractor) {
3735
this.extractor = extractor;
3836
}
3937

40-
SequenceKey key() {
41-
return key;
42-
}
43-
4438
void add(E element) {
4539
Ordinal ordinal = extractor.apply(element);
4640
if (start == null || start.compareTo(ordinal) > 0) {
@@ -82,14 +76,15 @@ E before(Ordinal ordinal) {
8276
return match != null ? match.v1() : null;
8377
}
8478

85-
void trimToLast() {
79+
E trimToLast() {
8680
E last = elements.peekLast();
8781
if (last != null) {
8882
elements.clear();
8983
start = null;
9084
stop = null;
9185
add(last);
9286
}
87+
return last;
9388
}
9489

9590
private Tuple<E, Integer> findBefore(Ordinal ordinal) {
@@ -132,7 +127,7 @@ public Ordinal next() {
132127

133128
@Override
134129
public int hashCode() {
135-
return key.hashCode();
130+
return elements.hashCode();
136131
}
137132

138133
@Override
@@ -146,12 +141,11 @@ public boolean equals(Object obj) {
146141
}
147142

148143
OrdinalGroup<?> other = (OrdinalGroup<?>) obj;
149-
return Objects.equals(key, other.key)
150-
&& Objects.equals(elements, other.elements);
144+
return Objects.equals(elements, other.elements);
151145
}
152146

153147
@Override
154148
public String toString() {
155-
return format(null, "[{}][{}-{}]({} seqs)", key, start, stop, elements.size());
149+
return format(null, "[{}-{}]({} seqs)", start, stop, elements.size());
156150
}
157151
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceGroup.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
public class SequenceGroup extends OrdinalGroup<Sequence> {
1010

11-
SequenceGroup(SequenceKey key) {
12-
super(key, Sequence::ordinal);
11+
SequenceGroup() {
12+
super(Sequence::ordinal);
1313
}
14-
}
14+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequenceMatcher.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,21 @@ public class SequenceMatcher {
3030
static class Stats {
3131
long seen = 0;
3232
long ignored = 0;
33-
long until = 0;
3433
long rejectionMaxspan = 0;
3534
long rejectionUntil = 0;
3635

3736
@Override
3837
public String toString() {
39-
return LoggerMessageFormat.format(null, "Stats: Seen [{}]/Ignored [{}]/Until [{}]/Rejected {Maxspan [{}]/Until [{}]}",
38+
return LoggerMessageFormat.format(null, "Stats: Seen [{}]/Ignored [{}]/Rejected {Maxspan [{}]/Until [{}]}",
4039
seen,
4140
ignored,
42-
until,
4341
rejectionMaxspan,
4442
rejectionUntil);
4543
}
4644

4745
public void clear() {
4846
seen = 0;
4947
ignored = 0;
50-
until = 0;
5148
rejectionMaxspan = 0;
5249
rejectionUntil = 0;
5350
}
@@ -160,7 +157,7 @@ private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit
160157

161158
// remove the group early (as the key space is large)
162159
if (group.isEmpty()) {
163-
keyToSequences.remove(previousStage, group);
160+
keyToSequences.remove(previousStage, key);
164161
stageToKeys.remove(previousStage, key);
165162
}
166163

@@ -177,10 +174,10 @@ private void match(int stage, SequenceKey key, Ordinal ordinal, HitReference hit
177174
// until
178175
UntilGroup until = keyToSequences.untilIfPresent(key);
179176
if (until != null) {
180-
KeyAndOrdinal nearestUntil = until.before(ordinal);
177+
Ordinal nearestUntil = until.before(ordinal);
181178
if (nearestUntil != null) {
182179
// check if until matches
183-
if (nearestUntil.ordinal().between(sequence.ordinal(), ordinal)) {
180+
if (nearestUntil.between(sequence.ordinal(), ordinal)) {
184181
stats.rejectionUntil++;
185182
return;
186183
}
@@ -247,10 +244,6 @@ List<Sequence> completed() {
247244
return limit != null ? limit.view(asList) : asList;
248245
}
249246

250-
void dropUntil() {
251-
keyToSequences.dropUntil();
252-
}
253-
254247
void until(Iterable<KeyAndOrdinal> markers) {
255248
keyToSequences.until(markers);
256249
}

0 commit comments

Comments
 (0)