Skip to content

Commit b85b12d

Browse files
authored
Update persistent state document in the index the document belongs to (#51751)
1 parent e843469 commit b85b12d

File tree

2 files changed

+194
-31
lines changed

2 files changed

+194
-31
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java

+116-11
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,20 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
1111
import org.elasticsearch.action.bulk.BulkRequest;
12+
import org.elasticsearch.action.search.SearchRequest;
13+
import org.elasticsearch.action.search.SearchResponse;
14+
import org.elasticsearch.action.support.WriteRequest;
1215
import org.elasticsearch.common.bytes.BytesArray;
1316
import org.elasticsearch.common.bytes.BytesReference;
1417
import org.elasticsearch.common.bytes.CompositeBytesReference;
18+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
19+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
20+
import org.elasticsearch.common.xcontent.XContentParser;
1521
import org.elasticsearch.common.xcontent.XContentType;
22+
import org.elasticsearch.common.xcontent.json.JsonXContent;
23+
import org.elasticsearch.index.query.BoolQueryBuilder;
24+
import org.elasticsearch.index.query.IdsQueryBuilder;
25+
import org.elasticsearch.search.builder.SearchSourceBuilder;
1626
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
1727
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
1828
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -22,10 +32,24 @@
2232
import java.io.InputStream;
2333
import java.util.ArrayList;
2434
import java.util.List;
25-
35+
import java.util.Map;
36+
import java.util.Objects;
2637

2738
/**
28-
* Reads state documents of a stream, splits them and persists to an index via a bulk request
39+
* Reads state documents of a stream, splits them and persists to an index via a bulk request.
40+
*
41+
* Some types of state, for example data frame analytics state and categorizer state, are written multiple times with the same document id.
42+
* The code needs to make sure that even after .ml-state index rollover there are no duplicate documents across the .ml-state*
43+
* indices. Such duplicates are undesirable for at least two reasons:
44+
* 1. We deliberately have no mappings on the state index so we cannot sort and filter in a search
45+
* 2. The state documents are large, so having dead documents with duplicate IDs is suboptimal from a disk usage perspective
46+
*
47+
* In order to avoid duplicates the following sequence of steps is executed every time the document is about to get persisted:
48+
* 1. The first non-blank line is extracted from the given bytes. Lines are delimited by the new line character ('\n')
49+
* 2. Document id is extracted from this line.
50+
* 3. Document with this id is searched for in .ml-state* indices
51+
* 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
52+
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
2953
*/
3054
public class IndexingStateProcessor implements StateProcessor {
3155

@@ -88,7 +112,7 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
88112
// Ignore completely empty chunks
89113
if (nextZeroByte > splitFrom) {
90114
// No validation - assume the native process has formatted the state correctly
91-
persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
115+
findAppropriateIndexOrAliasAndPersist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
92116
}
93117
splitFrom = nextZeroByte + 1;
94118
}
@@ -98,11 +122,25 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
98122
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
99123
}
100124

101-
void persist(BytesReference bytes) throws IOException {
102-
BulkRequest bulkRequest = new BulkRequest();
103-
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), XContentType.JSON);
125+
/**
126+
* Finds an appropriate index the document should be put in and then persists the document in that index.
127+
* For what is considered to be "appropriate" see the class documentation.
128+
*/
129+
void findAppropriateIndexOrAliasAndPersist(BytesReference bytes) throws IOException {
130+
String firstNonBlankLine = extractFirstNonBlankLine(bytes);
131+
if (firstNonBlankLine == null) {
132+
return;
133+
}
134+
String stateDocId = extractDocId(firstNonBlankLine);
135+
String indexOrAlias = getConcreteIndexOrWriteAlias(stateDocId);
136+
persist(indexOrAlias, bytes);
137+
}
138+
139+
void persist(String indexOrAlias, BytesReference bytes) throws IOException {
140+
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
141+
bulkRequest.add(bytes, indexOrAlias, XContentType.JSON);
104142
if (bulkRequest.numberOfActions() > 0) {
105-
LOGGER.trace("[{}] Persisting job state document", jobId);
143+
LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length());
106144
try {
107145
resultsPersisterService.bulkIndexWithRetry(bulkRequest,
108146
jobId,
@@ -117,12 +155,79 @@ void persist(BytesReference bytes) throws IOException {
117155
}
118156

119157
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
120-
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
121-
if (bytesRef.get(i) == 0) {
122-
return i;
158+
return bytesRef.indexOf((byte)0, Math.max(searchFrom, splitFrom));
159+
}
160+
161+
@SuppressWarnings("unchecked")
162+
/**
163+
* Extracts document id from the given {@code bytesRef}.
164+
* Only first non-blank line is parsed and document id is assumed to be a nested "index._id" field of type String.
165+
*/
166+
static String extractDocId(String firstNonBlankLine) throws IOException {
167+
try (XContentParser parser =
168+
JsonXContent.jsonXContent.createParser(
169+
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, firstNonBlankLine)) {
170+
Map<String, Object> map = parser.map();
171+
if ((map.get("index") instanceof Map) == false) {
172+
throw new IllegalStateException("Could not extract \"index\" field out of [" + firstNonBlankLine + "]");
123173
}
174+
map = (Map<String, Object>)map.get("index");
175+
if ((map.get("_id") instanceof String) == false) {
176+
throw new IllegalStateException("Could not extract \"index._id\" field out of [" + firstNonBlankLine + "]");
177+
}
178+
return (String)map.get("_id");
124179
}
125-
return -1;
180+
}
181+
182+
/**
183+
* Extracts the first non-blank line from the given {@code bytesRef}.
184+
* Lines are separated by the new line character ('\n').
185+
* A line is considered blank if it only consists of space characters (' ').
186+
*/
187+
private static String extractFirstNonBlankLine(BytesReference bytesRef) {
188+
for (int searchFrom = 0; searchFrom < bytesRef.length();) {
189+
int newLineMarkerIndex = bytesRef.indexOf((byte) '\n', searchFrom);
190+
int searchTo = newLineMarkerIndex != -1 ? newLineMarkerIndex : bytesRef.length();
191+
if (isBlank(bytesRef, searchFrom, searchTo) == false) {
192+
return bytesRef.slice(searchFrom, searchTo - searchFrom).utf8ToString();
193+
}
194+
searchFrom = newLineMarkerIndex != -1 ? newLineMarkerIndex + 1 : bytesRef.length();
195+
}
196+
return null;
197+
}
198+
199+
/**
200+
* Checks whether the line pointed to by a pair of indexes: {@code from} (inclusive) and {@code to} (exclusive) is blank.
201+
* A line is considered blank if it only consists of space characters (' ').
202+
*/
203+
private static boolean isBlank(BytesReference bytesRef, int from, int to) {
204+
for (int i = from; i < to; ++i) {
205+
if (bytesRef.get(i) != ((byte) ' ')) {
206+
return false;
207+
}
208+
}
209+
return true;
210+
}
211+
212+
private String getConcreteIndexOrWriteAlias(String documentId) {
213+
Objects.requireNonNull(documentId);
214+
SearchRequest searchRequest =
215+
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
216+
.allowPartialSearchResults(false)
217+
.source(
218+
new SearchSourceBuilder()
219+
.size(1)
220+
.trackTotalHits(false)
221+
.query(new BoolQueryBuilder().filter(new IdsQueryBuilder().addIds(documentId))));
222+
SearchResponse searchResponse =
223+
resultsPersisterService.searchWithRetry(
224+
searchRequest,
225+
jobId,
226+
() -> true,
227+
(msg) -> auditor.warning(jobId, documentId + " " + msg));
228+
return searchResponse.getHits().getHits().length > 0
229+
? searchResponse.getHits().getHits()[0].getIndex()
230+
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
126231
}
127232
}
128233

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessorTests.java

+78-20
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
import com.carrotsearch.randomizedtesting.annotations.Timeout;
99
import org.elasticsearch.action.bulk.BulkRequest;
1010
import org.elasticsearch.action.bulk.BulkResponse;
11+
import org.elasticsearch.action.search.SearchRequest;
12+
import org.elasticsearch.action.search.SearchResponse;
1113
import org.elasticsearch.common.bytes.BytesReference;
12-
import org.elasticsearch.common.settings.Settings;
13-
import org.elasticsearch.common.util.concurrent.ThreadContext;
14-
import org.elasticsearch.mock.orig.Mockito;
14+
import org.elasticsearch.rest.RestStatus;
15+
import org.elasticsearch.search.SearchHit;
16+
import org.elasticsearch.search.SearchHits;
1517
import org.elasticsearch.test.ESTestCase;
16-
import org.elasticsearch.threadpool.ThreadPool;
1718
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
1819
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
1920
import org.junit.After;
@@ -22,15 +23,23 @@
2223

2324
import java.io.ByteArrayInputStream;
2425
import java.io.IOException;
26+
import java.io.InputStream;
2527
import java.nio.charset.StandardCharsets;
2628
import java.util.List;
29+
import java.util.Map;
30+
import java.util.function.Function;
2731

32+
import static org.hamcrest.Matchers.containsString;
33+
import static org.hamcrest.Matchers.equalTo;
2834
import static org.mockito.Matchers.any;
35+
import static org.mockito.Matchers.eq;
36+
import static org.mockito.Mockito.doReturn;
2937
import static org.mockito.Mockito.mock;
3038
import static org.mockito.Mockito.never;
3139
import static org.mockito.Mockito.spy;
3240
import static org.mockito.Mockito.times;
3341
import static org.mockito.Mockito.verify;
42+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3443
import static org.mockito.Mockito.when;
3544

3645
/**
@@ -39,6 +48,7 @@
3948
public class IndexingStateProcessorTests extends ESTestCase {
4049

4150
private static final String STATE_SAMPLE = ""
51+
+ " \n"
4252
+ "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n"
4353
+ "{ \"field\" : \"value1\" }\n"
4454
+ "\0"
@@ -56,54 +66,99 @@ public class IndexingStateProcessorTests extends ESTestCase {
5666

5767
private IndexingStateProcessor stateProcessor;
5868
private ResultsPersisterService resultsPersisterService;
69+
private SearchResponse searchResponse;
5970

6071
@Before
6172
public void initialize() {
73+
searchResponse = mock(SearchResponse.class);
74+
when(searchResponse.status()).thenReturn(RestStatus.OK);
6275
resultsPersisterService = mock(ResultsPersisterService.class);
76+
doReturn(searchResponse).when(resultsPersisterService).searchWithRetry(any(SearchRequest.class), any(), any(), any());
77+
doReturn(mock(BulkResponse.class)).when(resultsPersisterService).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
6378
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
6479
stateProcessor = spy(new IndexingStateProcessor(JOB_ID, resultsPersisterService, auditor));
65-
when(resultsPersisterService.bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any())).thenReturn(mock(BulkResponse.class));
66-
ThreadPool threadPool = mock(ThreadPool.class);
67-
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
6880
}
6981

7082
@After
7183
public void verifyNoMoreClientInteractions() {
72-
Mockito.verifyNoMoreInteractions(resultsPersisterService);
84+
verifyNoMoreInteractions(resultsPersisterService);
7385
}
7486

75-
public void testStateRead() throws IOException {
87+
public void testExtractDocId() throws IOException {
88+
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_index\": \"test\", \"_id\": \"1\" } }\n"), equalTo("1"));
89+
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_id\": \"2\" } }\n"), equalTo("2"));
90+
}
91+
92+
private void testStateRead(SearchHits searchHits, String expectedIndexOrAlias) throws IOException {
93+
when(searchResponse.getHits()).thenReturn(searchHits);
94+
7695
ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
7796
stateProcessor.process(stream);
7897
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
79-
verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture());
98+
verify(stateProcessor, times(3)).persist(eq(expectedIndexOrAlias), bytesRefCaptor.capture());
8099

81100
String[] threeStates = STATE_SAMPLE.split("\0");
82101
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
83102
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
84103
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
85104
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
105+
verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
86106
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
87107
}
88108

109+
public void testStateRead_StateDocumentCreated() throws IOException {
110+
testStateRead(SearchHits.empty(), ".ml-state-write");
111+
}
112+
113+
public void testStateRead_StateDocumentUpdated() throws IOException {
114+
testStateRead(
115+
new SearchHits(new SearchHit[]{ SearchHit.createFromMap(Map.of("_index", ".ml-state-dummy")) }, null, 0.0f),
116+
".ml-state-dummy");
117+
}
118+
89119
public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
90120
String zeroBytes = "\0\0\0\0\0\0";
91121
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
92122

93123
stateProcessor.process(stream);
94124

95-
verify(stateProcessor, never()).persist(any());
96-
Mockito.verifyNoMoreInteractions(resultsPersisterService);
125+
verify(stateProcessor, never()).persist(any(), any());
97126
}
98127

99-
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
100-
String zeroBytes = " \n\0";
101-
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
128+
public void testStateReadGivenSpacesAndNewLineCharactersFollowedByZeroByte() throws IOException {
129+
Function<String, InputStream> stringToInputStream = s -> new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8));
102130

103-
stateProcessor.process(stream);
131+
stateProcessor.process(stringToInputStream.apply("\0"));
132+
stateProcessor.process(stringToInputStream.apply(" \0"));
133+
stateProcessor.process(stringToInputStream.apply("\n\0"));
134+
stateProcessor.process(stringToInputStream.apply(" \0"));
135+
stateProcessor.process(stringToInputStream.apply(" \n \0"));
136+
stateProcessor.process(stringToInputStream.apply(" \n\n \0"));
137+
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
138+
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
139+
stateProcessor.process(stringToInputStream.apply("\n \n \0"));
104140

105-
verify(stateProcessor, times(1)).persist(any());
106-
Mockito.verifyNoMoreInteractions(resultsPersisterService);
141+
verify(stateProcessor, never()).persist(any(), any());
142+
}
143+
144+
public void testStateReadGivenNoIndexField() throws IOException {
145+
String bytes = " \n \n \n \n\n {}\0";
146+
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));
147+
148+
Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
149+
assertThat(e.getMessage(), containsString("Could not extract \"index\" field"));
150+
151+
verify(stateProcessor, never()).persist(any(), any());
152+
}
153+
154+
public void testStateReadGivenNoIdField() throws IOException {
155+
String bytes = " \n \n \n {\"index\": {}}\0";
156+
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));
157+
158+
Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
159+
assertThat(e.getMessage(), containsString("Could not extract \"index._id\" field"));
160+
161+
verify(stateProcessor, never()).persist(any(), any());
107162
}
108163

109164
/**
@@ -113,9 +168,11 @@ public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOExc
113168
*/
114169
@Timeout(millis = 10 * 1000)
115170
public void testLargeStateRead() throws Exception {
171+
when(searchResponse.getHits()).thenReturn(SearchHits.empty());
172+
116173
StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
117174
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
118-
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\"}}\n");
175+
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_id\":\"doc").append(docNum).append("\"}}\n");
119176
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
120177
builder.append("data");
121178
}
@@ -124,7 +181,8 @@ public void testLargeStateRead() throws Exception {
124181

125182
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
126183
stateProcessor.process(stream);
127-
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
184+
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any());
185+
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
128186
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
129187
}
130188
}

0 commit comments

Comments
 (0)