35
35
import java .util .Map ;
36
36
import java .util .Objects ;
37
37
38
-
39
38
/**
40
- * Reads state documents of a stream, splits them and persists to an index via a bulk request
39
+ * Reads state documents of a stream, splits them and persists to an index via a bulk request.
40
+ *
41
+ * Some types of state, for example data frame analytics state and categorizer state, are written multiple times with the same document id.
42
+ * The code needs to make sure that even after .ml-state index rollover there are no duplicate documents across the .ml-state*
43
+ * indices. Such duplicates are undesirable for at least two reasons:
44
+ * 1. We deliberately have no mappings on the state index so we cannot sort and filter in a search
45
+ * 2. The state documents are large, so having dead documents with duplicate IDs is suboptimal from a disk usage perspective
46
+ *
47
+ * In order to avoid duplicates the following sequence of steps is executed every time the document is about to get persisted:
48
+ * 1. The first non-blank line is extracted from the given bytes. Lines are delimited by the new line character ('\n')
49
+ * 2. Document id is extracted from this line.
50
+ * 3. Document with this id is searched for in .ml-state* indices
51
+ * 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
52
+ * Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
41
53
*/
42
54
public class IndexingStateProcessor implements StateProcessor {
43
55
@@ -100,7 +112,7 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
100
112
// Ignore completely empty chunks
101
113
if (nextZeroByte > splitFrom ) {
102
114
// No validation - assume the native process has formatted the state correctly
103
- findAppropriateIndexAndPersist (bytesRef .slice (splitFrom , nextZeroByte - splitFrom ));
115
+ findAppropriateIndexOrAliasAndPersist (bytesRef .slice (splitFrom , nextZeroByte - splitFrom ));
104
116
}
105
117
splitFrom = nextZeroByte + 1 ;
106
118
}
@@ -110,11 +122,16 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom)
110
122
return bytesRef .slice (splitFrom , bytesRef .length () - splitFrom );
111
123
}
112
124
113
- void findAppropriateIndexAndPersist (BytesReference bytes ) throws IOException {
114
- String stateDocId = extractDocId (bytes );
115
- if (stateDocId == null ) {
125
+ /**
126
+ * Finds an appropriate index the document should be put in and then persists the document in that index.
127
+ * For what is considered to be "appropriate" see the class documentation.
128
+ */
129
+ void findAppropriateIndexOrAliasAndPersist (BytesReference bytes ) throws IOException {
130
+ String firstNonBlankLine = extractFirstNonBlankLine (bytes );
131
+ if (firstNonBlankLine == null ) {
116
132
return ;
117
133
}
134
+ String stateDocId = extractDocId (firstNonBlankLine );
118
135
String indexOrAlias = getConcreteIndexOrWriteAlias (stateDocId );
119
136
persist (indexOrAlias , bytes );
120
137
}
@@ -142,11 +159,11 @@ private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int
142
159
}
143
160
144
161
@ SuppressWarnings ("unchecked" )
145
- static String extractDocId ( BytesReference bytesRef ) throws IOException {
146
- String firstNonBlankLine = extractFirstNonBlankLine ( bytesRef );
147
- if ( firstNonBlankLine == null ) {
148
- return null ;
149
- }
162
+ /**
163
+ * Extracts document id from the given {@code bytesRef}.
164
+ * Only first non-blank line is parsed and document id is assumed to be a nested "index._id" field of type String.
165
+ */
166
+ static String extractDocId ( String firstNonBlankLine ) throws IOException {
150
167
try (XContentParser parser =
151
168
JsonXContent .jsonXContent .createParser (
152
169
NamedXContentRegistry .EMPTY , LoggingDeprecationHandler .INSTANCE , firstNonBlankLine )) {
@@ -162,19 +179,36 @@ static String extractDocId(BytesReference bytesRef) throws IOException {
162
179
}
163
180
}
164
181
182
+ /**
183
+ * Extracts the first non-blank line from the given {@code bytesRef}.
184
+ * Lines are separated by the new line character ('\n').
185
+ * A line is considered blank if it only consists of space characters (' ').
186
+ */
165
187
private static String extractFirstNonBlankLine (BytesReference bytesRef ) {
166
188
for (int searchFrom = 0 ; searchFrom < bytesRef .length ();) {
167
189
int newLineMarkerIndex = bytesRef .indexOf ((byte ) '\n' , searchFrom );
168
190
int searchTo = newLineMarkerIndex != -1 ? newLineMarkerIndex : bytesRef .length ();
169
- String line = bytesRef .slice (searchFrom , searchTo - searchFrom ).utf8ToString ();
170
- if (line .isBlank () == false ) {
171
- return line ;
191
+ if (isBlank (bytesRef , searchFrom , searchTo ) == false ) {
192
+ return bytesRef .slice (searchFrom , searchTo - searchFrom ).utf8ToString ();
172
193
}
173
194
searchFrom = newLineMarkerIndex != -1 ? newLineMarkerIndex + 1 : bytesRef .length ();
174
195
}
175
196
return null ;
176
197
}
177
198
199
+ /**
200
+ * Checks whether the line pointed to by a pair of indexes: {@code from} (inclusive) and {@code to} (exclusive) is blank.
201
+ * A line is considered blank if it only consists of space characters (' ').
202
+ */
203
+ private static boolean isBlank (BytesReference bytesRef , int from , int to ) {
204
+ for (int i = from ; i < to ; ++i ) {
205
+ if (bytesRef .get (i ) != ((byte ) ' ' )) {
206
+ return false ;
207
+ }
208
+ }
209
+ return true ;
210
+ }
211
+
178
212
private String getConcreteIndexOrWriteAlias (String documentId ) {
179
213
Objects .requireNonNull (documentId );
180
214
SearchRequest searchRequest =
0 commit comments