Skip to content

Commit 99bc44c

Browse files
committed
[ML] Add a timeout option to file structure finder (#34117)
This can be used to restrict the amount of CPU a single structure finder request can use. The timeout is not implemented precisely, so requests may run for slightly longer than the timeout before aborting. The default is 25 seconds, which is a little below Kibana's default timeout of 30 seconds for calls to Elasticsearch APIs.
1 parent d3228bd commit 99bc44c

28 files changed

+900
-170
lines changed

docs/reference/ml/apis/find-file-structure.asciidoc

Lines changed: 408 additions & 5 deletions
Large diffs are not rendered by default.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FindFileStructureAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
1818
import org.elasticsearch.common.io.stream.Writeable;
19+
import org.elasticsearch.common.unit.TimeValue;
1920
import org.elasticsearch.common.xcontent.StatusToXContentObject;
2021
import org.elasticsearch.common.xcontent.XContentBuilder;
2122
import org.elasticsearch.rest.RestStatus;
@@ -118,6 +119,7 @@ public boolean equals(Object other) {
118119
public static class Request extends ActionRequest {
119120

120121
public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
122+
public static final ParseField TIMEOUT = new ParseField("timeout");
121123
public static final ParseField CHARSET = FileStructure.CHARSET;
122124
public static final ParseField FORMAT = FileStructure.FORMAT;
123125
public static final ParseField COLUMN_NAMES = FileStructure.COLUMN_NAMES;
@@ -134,6 +136,7 @@ public static class Request extends ActionRequest {
134136
"[%s] may only be specified if [" + FORMAT.getPreferredName() + "] is [%s]";
135137

136138
private Integer linesToSample;
139+
private TimeValue timeout;
137140
private String charset;
138141
private FileStructure.Format format;
139142
private List<String> columnNames;
@@ -157,6 +160,14 @@ public void setLinesToSample(Integer linesToSample) {
157160
this.linesToSample = linesToSample;
158161
}
159162

163+
public TimeValue getTimeout() {
164+
return timeout;
165+
}
166+
167+
public void setTimeout(TimeValue timeout) {
168+
this.timeout = timeout;
169+
}
170+
160171
public String getCharset() {
161172
return charset;
162173
}
@@ -319,6 +330,7 @@ public ActionRequestValidationException validate() {
319330
public void readFrom(StreamInput in) throws IOException {
320331
super.readFrom(in);
321332
linesToSample = in.readOptionalVInt();
333+
timeout = in.readOptionalTimeValue();
322334
charset = in.readOptionalString();
323335
format = in.readBoolean() ? in.readEnum(FileStructure.Format.class) : null;
324336
columnNames = in.readBoolean() ? in.readList(StreamInput::readString) : null;
@@ -336,6 +348,7 @@ public void readFrom(StreamInput in) throws IOException {
336348
public void writeTo(StreamOutput out) throws IOException {
337349
super.writeTo(out);
338350
out.writeOptionalVInt(linesToSample);
351+
out.writeOptionalTimeValue(timeout);
339352
out.writeOptionalString(charset);
340353
if (format == null) {
341354
out.writeBoolean(false);
@@ -371,7 +384,7 @@ public void writeTo(StreamOutput out) throws IOException {
371384

372385
@Override
373386
public int hashCode() {
374-
return Objects.hash(linesToSample, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
387+
return Objects.hash(linesToSample, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
375388
timestampField, sample);
376389
}
377390

@@ -388,6 +401,7 @@ public boolean equals(Object other) {
388401

389402
Request that = (Request) other;
390403
return Objects.equals(this.linesToSample, that.linesToSample) &&
404+
Objects.equals(this.timeout, that.timeout) &&
391405
Objects.equals(this.charset, that.charset) &&
392406
Objects.equals(this.format, that.format) &&
393407
Objects.equals(this.columnNames, that.columnNames) &&

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFindFileStructureAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ protected void doExecute(FindFileStructureAction.Request request, ActionListener
4545

4646
private FindFileStructureAction.Response buildFileStructureResponse(FindFileStructureAction.Request request) throws Exception {
4747

48-
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager();
48+
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager(threadPool.scheduler());
4949

5050
FileStructureFinder fileStructureFinder = structureFinderManager.findFileStructure(request.getLinesToSample(),
51-
request.getSample().streamInput(), new FileStructureOverrides(request));
51+
request.getSample().streamInput(), new FileStructureOverrides(request), request.getTimeout());
5252

5353
return new FindFileStructureAction.Response(fileStructureFinder.getStructure());
5454
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ public class DelimitedFileStructureFinder implements FileStructureFinder {
4141

4242
static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String> explanation, String sample, String charsetName,
4343
Boolean hasByteOrderMarker, CsvPreference csvPreference,
44-
boolean trimFields, FileStructureOverrides overrides)
44+
boolean trimFields, FileStructureOverrides overrides,
45+
TimeoutChecker timeoutChecker)
4546
throws IOException {
4647

47-
Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference);
48+
Tuple<List<List<String>>, List<Integer>> parsed = readRows(sample, csvPreference, timeoutChecker);
4849
List<List<String>> rows = parsed.v1();
4950
List<Integer> lineNumbers = parsed.v2();
5051

@@ -106,7 +107,8 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
106107
structureBuilder.setShouldTrimFields(true);
107108
}
108109

109-
Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides);
110+
Tuple<String, TimestampMatch> timeField = FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides,
111+
timeoutChecker);
110112
if (timeField != null) {
111113
String timeLineRegex = null;
112114
StringBuilder builder = new StringBuilder("^");
@@ -148,7 +150,7 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
148150
}
149151

150152
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
151-
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords);
153+
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);
152154

153155
SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
154156
if (timeField != null) {
@@ -183,7 +185,8 @@ public FileStructure getStructure() {
183185
return structure;
184186
}
185187

186-
static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference) throws IOException {
188+
static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPreference csvPreference, TimeoutChecker timeoutChecker)
189+
throws IOException {
187190

188191
int fieldsInFirstRow = -1;
189192

@@ -204,6 +207,7 @@ static Tuple<List<List<String>>, List<Integer>> readRows(String sample, CsvPrefe
204207
}
205208
}
206209
rows.add(row);
210+
timeoutChecker.check("delimited record parsing");
207211
lineNumbers.add(csvReader.getLineNumber());
208212
}
209213
} catch (SuperCsvException e) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public boolean canCreateFromSample(List<String> explanation, String sample) {
6262

6363
@Override
6464
public FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
65-
FileStructureOverrides overrides) throws IOException {
65+
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws IOException {
6666
return DelimitedFileStructureFinder.makeDelimitedFileStructureFinder(explanation, sample, charsetName, hasByteOrderMarker,
67-
csvPreference, trimFields, overrides);
67+
csvPreference, trimFields, overrides, timeoutChecker);
6868
}
6969
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ public interface FileStructureFinderFactory {
3939
* @param hasByteOrderMarker Did the sample have a byte order marker? <code>null</code> means "not relevant".
4040
* @param overrides Stores structure decisions that have been made by the end user, and should
4141
* take precedence over anything the {@link FileStructureFinder} may decide.
42+
* @param timeoutChecker Will abort the operation if its timeout is exceeded.
4243
* @return A {@link FileStructureFinder} object suitable for determining the structure of the supplied sample.
4344
* @throws Exception if something goes wrong during creation.
4445
*/
4546
FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
46-
FileStructureOverrides overrides) throws Exception;
47+
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception;
4748
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManager.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
import com.ibm.icu.text.CharsetDetector;
99
import com.ibm.icu.text.CharsetMatch;
10+
import org.elasticsearch.ElasticsearchTimeoutException;
1011
import org.elasticsearch.common.collect.Tuple;
12+
import org.elasticsearch.common.unit.TimeValue;
1113

1214
import java.io.BufferedInputStream;
1315
import java.io.BufferedReader;
@@ -23,15 +25,17 @@
2325
import java.util.HashSet;
2426
import java.util.List;
2527
import java.util.Locale;
28+
import java.util.Objects;
2629
import java.util.Optional;
2730
import java.util.Set;
31+
import java.util.concurrent.ScheduledExecutorService;
2832
import java.util.stream.Collectors;
2933

3034
/**
3135
* Runs the high-level steps needed to create ingest configs for the specified file. In order:
3236
* 1. Determine the most likely character set (UTF-8, UTF-16LE, ISO-8859-2, etc.)
3337
* 2. Load a sample of the file, consisting of the first 1000 lines of the file
34-
* 3. Determine the most likely file structure - one of ND-JSON, XML, CSV, TSV or semi-structured text
38+
* 3. Determine the most likely file structure - one of ND-JSON, XML, delimited or semi-structured text
3539
* 4. Create an appropriate structure object and delegate writing configs to it
3640
*/
3741
public final class FileStructureFinderManager {
@@ -81,8 +85,18 @@ public final class FileStructureFinderManager {
8185

8286
private static final int BUFFER_SIZE = 8192;
8387

88+
private final ScheduledExecutorService scheduler;
89+
90+
/**
91+
* Create the file structure manager.
92+
* @param scheduler Used for checking timeouts.
93+
*/
94+
public FileStructureFinderManager(ScheduledExecutorService scheduler) {
95+
this.scheduler = Objects.requireNonNull(scheduler);
96+
}
97+
8498
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile) throws Exception {
85-
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
99+
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
86100
}
87101

88102
/**
@@ -95,42 +109,49 @@ public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Input
95109
* @param overrides Aspects of the file structure that are known in advance. These take precedence over
96110
* values determined by structure analysis. An exception will be thrown if the file structure
97111
* is incompatible with an overridden value.
112+
* @param timeout The maximum time the analysis is permitted to take. If it takes longer than this an
113+
* {@link ElasticsearchTimeoutException} may be thrown (although not necessarily immediately
114+
* the timeout is exceeded).
98115
* @return A {@link FileStructureFinder} object from which the structure and messages can be queried.
99116
* @throws Exception A variety of problems could occur at various stages of the structure finding process.
100117
*/
101-
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides)
118+
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides,
119+
TimeValue timeout)
102120
throws Exception {
103121
return findFileStructure(new ArrayList<>(), (idealSampleLineCount == null) ? DEFAULT_IDEAL_SAMPLE_LINE_COUNT : idealSampleLineCount,
104-
fromFile, overrides);
122+
fromFile, overrides, timeout);
105123
}
106124

107125
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile)
108126
throws Exception {
109-
return findFileStructure(new ArrayList<>(), idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES);
127+
return findFileStructure(explanation, idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
110128
}
111129

112130
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile,
113-
FileStructureOverrides overrides) throws Exception {
114-
115-
String charsetName = overrides.getCharset();
116-
Reader sampleReader;
117-
if (charsetName != null) {
118-
// Creating the reader will throw if the specified character set does not exist
119-
sampleReader = new InputStreamReader(fromFile, charsetName);
120-
explanation.add("Using specified character encoding [" + charsetName + "]");
121-
} else {
122-
CharsetMatch charsetMatch = findCharset(explanation, fromFile);
123-
charsetName = charsetMatch.getName();
124-
sampleReader = charsetMatch.getReader();
125-
}
131+
FileStructureOverrides overrides, TimeValue timeout) throws Exception {
132+
133+
try (TimeoutChecker timeoutChecker = new TimeoutChecker("structure analysis", timeout, scheduler)) {
134+
135+
String charsetName = overrides.getCharset();
136+
Reader sampleReader;
137+
if (charsetName != null) {
138+
// Creating the reader will throw if the specified character set does not exist
139+
sampleReader = new InputStreamReader(fromFile, charsetName);
140+
explanation.add("Using specified character encoding [" + charsetName + "]");
141+
} else {
142+
CharsetMatch charsetMatch = findCharset(explanation, fromFile, timeoutChecker);
143+
charsetName = charsetMatch.getName();
144+
sampleReader = charsetMatch.getReader();
145+
}
126146

127-
Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
128-
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount));
147+
Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
148+
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount), timeoutChecker);
129149

130-
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides);
150+
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides, timeoutChecker);
151+
}
131152
}
132153

133-
CharsetMatch findCharset(List<String> explanation, InputStream inputStream) throws Exception {
154+
CharsetMatch findCharset(List<String> explanation, InputStream inputStream, TimeoutChecker timeoutChecker) throws Exception {
134155

135156
// We need an input stream that supports mark and reset, so wrap the argument
136157
// in a BufferedInputStream if it doesn't already support this feature
@@ -141,6 +162,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
141162
// This is from ICU4J
142163
CharsetDetector charsetDetector = new CharsetDetector().setText(inputStream);
143164
CharsetMatch[] charsetMatches = charsetDetector.detectAll();
165+
timeoutChecker.check("character set detection");
144166

145167
// Determine some extra characteristics of the input to compensate for some deficiencies of ICU4J
146168
boolean pureAscii = true;
@@ -164,6 +186,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
164186
remainingLength -= bytesRead;
165187
} while (containsZeroBytes == false && remainingLength > 0);
166188
inputStream.reset();
189+
timeoutChecker.check("character set detection");
167190

168191
if (pureAscii) {
169192
// If the input is pure ASCII then many single byte character sets will match. We want to favour
@@ -220,7 +243,7 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream) thro
220243
}
221244

222245
FileStructureFinder makeBestStructureFinder(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
223-
FileStructureOverrides overrides) throws Exception {
246+
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception {
224247

225248
Character delimiter = overrides.getDelimiter();
226249
Character quote = overrides.getQuote();
@@ -250,16 +273,18 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam
250273
}
251274

252275
for (FileStructureFinderFactory factory : factories) {
276+
timeoutChecker.check("high level format detection");
253277
if (factory.canCreateFromSample(explanation, sample)) {
254-
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides);
278+
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides, timeoutChecker);
255279
}
256280
}
257281

258282
throw new IllegalArgumentException("Input did not match " +
259283
((overrides.getFormat() == null) ? "any known formats" : "the specified format [" + overrides.getFormat() + "]"));
260284
}
261285

262-
private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines) throws IOException {
286+
private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int minLines, int maxLines, TimeoutChecker timeoutChecker)
287+
throws IOException {
263288

264289
int lineCount = 0;
265290
BufferedReader bufferedReader = new BufferedReader(reader);
@@ -283,6 +308,7 @@ private Tuple<String, Boolean> sampleFile(Reader reader, String charsetName, int
283308
String line;
284309
while ((line = bufferedReader.readLine()) != null && ++lineCount <= maxLines) {
285310
sample.append(line).append('\n');
311+
timeoutChecker.check("sample line splitting");
286312
}
287313

288314
if (lineCount < minLines) {

0 commit comments

Comments
 (0)