Skip to content

Commit e38e631

Browse files
authored
[7.x] Implement DataFrameAnalyticsAuditMessage and DataFrameAnalyticsAuditor (#45967) (#46519)
1 parent 35810bd commit e38e631

File tree

18 files changed

+193
-17
lines changed

18 files changed

+193
-17
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessage.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
2727
public static final ParseField LEVEL = new ParseField("level");
2828
public static final ParseField TIMESTAMP = new ParseField("timestamp");
2929
public static final ParseField NODE_NAME = new ParseField("node_name");
30+
public static final ParseField JOB_TYPE = new ParseField("job_type");
3031

3132
protected static final <T extends AbstractAuditMessage> ConstructingObjectParser<T, Void> createParser(
3233
String name, AbstractAuditMessageFactory<T> messageFactory, ParseField resourceField) {
@@ -99,13 +100,17 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
99100
if (nodeName != null) {
100101
builder.field(NODE_NAME.getPreferredName(), nodeName);
101102
}
103+
String jobType = getJobType();
104+
if (jobType != null) {
105+
builder.field(JOB_TYPE.getPreferredName(), jobType);
106+
}
102107
builder.endObject();
103108
return builder;
104109
}
105110

106111
@Override
107112
public int hashCode() {
108-
return Objects.hash(resourceId, message, level, timestamp, nodeName);
113+
return Objects.hash(resourceId, message, level, timestamp, nodeName, getJobType());
109114
}
110115

111116
@Override
@@ -122,8 +127,17 @@ public boolean equals(Object obj) {
122127
Objects.equals(message, other.message) &&
123128
Objects.equals(level, other.level) &&
124129
Objects.equals(timestamp, other.timestamp) &&
125-
Objects.equals(nodeName, other.nodeName);
130+
Objects.equals(nodeName, other.nodeName) &&
131+
Objects.equals(getJobType(), other.getJobType());
126132
}
127133

134+
/**
135+
* @return job type string used to tell apart jobs of different types stored in the same index
136+
*/
137+
public abstract String getJobType();
138+
139+
/**
140+
* @return resource id field name used when storing a new message
141+
*/
128142
protected abstract String getResourceField();
129143
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessage.java

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public DataFrameAuditMessage(String resourceId, String message, Level level, Dat
2323
super(resourceId, message, level, timestamp, nodeName);
2424
}
2525

26+
@Override
27+
public final String getJobType() {
28+
return null;
29+
}
30+
2631
@Override
2732
protected String getResourceField() {
2833
return TRANSFORM_ID.getPreferredName();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1121,12 +1121,13 @@ public static XContentBuilder auditMessageMapping() throws IOException {
11211121
XContentBuilder builder = jsonBuilder().startObject();
11221122
builder.startObject(SINGLE_MAPPING_NAME);
11231123
addMetaInformation(builder);
1124+
builder.field(DYNAMIC, "false");
11241125
builder.startObject(PROPERTIES)
11251126
.startObject(Job.ID.getPreferredName())
11261127
.field(TYPE, KEYWORD)
11271128
.endObject()
11281129
.startObject(AnomalyDetectionAuditMessage.LEVEL.getPreferredName())
1129-
.field(TYPE, KEYWORD)
1130+
.field(TYPE, KEYWORD)
11301131
.endObject()
11311132
.startObject(AnomalyDetectionAuditMessage.MESSAGE.getPreferredName())
11321133
.field(TYPE, TEXT)
@@ -1142,6 +1143,9 @@ public static XContentBuilder auditMessageMapping() throws IOException {
11421143
.startObject(AnomalyDetectionAuditMessage.NODE_NAME.getPreferredName())
11431144
.field(TYPE, KEYWORD)
11441145
.endObject()
1146+
.startObject(AnomalyDetectionAuditMessage.JOB_TYPE.getPreferredName())
1147+
.field(TYPE, KEYWORD)
1148+
.endObject()
11451149
.endObject()
11461150
.endObject()
11471151
.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/AnomalyDetectionAuditMessage.java

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public AnomalyDetectionAuditMessage(String resourceId, String message, Level lev
2323
super(resourceId, message, level, timestamp, nodeName);
2424
}
2525

26+
@Override
27+
public final String getJobType() {
28+
return Job.ANOMALY_DETECTOR_JOB_TYPE;
29+
}
30+
2631
@Override
2732
protected String getResourceField() {
2833
return JOB_ID.getPreferredName();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/notifications/AuditorField.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package org.elasticsearch.xpack.core.ml.notifications;
77

88
public final class AuditorField {
9-
public static final String NOTIFICATIONS_INDEX = ".ml-notifications";
109

11-
private AuditorField() {}
10+
public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000001";
1211

12+
private AuditorField() {}
1313
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml.notifications;
7+
8+
import org.elasticsearch.common.ParseField;
9+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
10+
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
11+
import org.elasticsearch.xpack.core.common.notifications.Level;
12+
import org.elasticsearch.xpack.core.ml.job.config.Job;
13+
14+
import java.util.Date;
15+
16+
public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {
17+
18+
private static final ParseField JOB_ID = Job.ID;
19+
public static final ConstructingObjectParser<DataFrameAnalyticsAuditMessage, Void> PARSER =
20+
createParser("ml_analytics_audit_message", DataFrameAnalyticsAuditMessage::new, JOB_ID);
21+
22+
public DataFrameAnalyticsAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
23+
super(resourceId, message, level, timestamp, nodeName);
24+
}
25+
26+
@Override
27+
public final String getJobType() {
28+
return "data_frame_analytics";
29+
}
30+
31+
@Override
32+
protected String getResourceField() {
33+
return JOB_ID.getPreferredName();
34+
}
35+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditMessageTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ static class TestAuditMessage extends AbstractAuditMessage {
2626
super(resourceId, message, level, timestamp, nodeName);
2727
}
2828

29+
@Override
30+
public String getJobType() {
31+
return "test_type";
32+
}
33+
2934
@Override
3035
protected String getResourceField() {
3136
return TEST_ID.getPreferredName();
@@ -42,6 +47,11 @@ public void testGetResourceField() {
4247
assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
4348
}
4449

50+
public void testGetJobType() {
51+
TestAuditMessage message = createTestInstance();
52+
assertThat(message.getJobType(), equalTo("test_type"));
53+
}
54+
4555
public void testNewInfo() {
4656
TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
4757
assertThat(message.getResourceId(), equalTo(RESOURCE_ID));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/notifications/DataFrameAuditMessageTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,15 @@
1111

1212
import java.util.Date;
1313

14+
import static org.hamcrest.Matchers.nullValue;
15+
1416
public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {
1517

18+
public void testGetJobType() {
19+
DataFrameAuditMessage message = createTestInstance();
20+
assertThat(message.getJobType(), nullValue());
21+
}
22+
1623
@Override
1724
protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
1825
return DataFrameAuditMessage.PARSER.apply(parser, null);

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/notifications/AnomalyDetectionAuditMessageTests.java

+8
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,19 @@
88
import org.elasticsearch.common.xcontent.XContentParser;
99
import org.elasticsearch.test.AbstractXContentTestCase;
1010
import org.elasticsearch.xpack.core.common.notifications.Level;
11+
import org.elasticsearch.xpack.core.ml.job.config.Job;
1112

1213
import java.util.Date;
1314

15+
import static org.hamcrest.Matchers.equalTo;
16+
1417
public class AnomalyDetectionAuditMessageTests extends AbstractXContentTestCase<AnomalyDetectionAuditMessage> {
1518

19+
public void testGetJobType() {
20+
AnomalyDetectionAuditMessage message = createTestInstance();
21+
assertThat(message.getJobType(), equalTo(Job.ANOMALY_DETECTOR_JOB_TYPE));
22+
}
23+
1624
@Override
1725
protected AnomalyDetectionAuditMessage doParseInstance(XContentParser parser) {
1826
return AnomalyDetectionAuditMessage.PARSER.apply(parser, null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml.notifications;
7+
8+
import org.elasticsearch.common.xcontent.XContentParser;
9+
import org.elasticsearch.test.AbstractXContentTestCase;
10+
import org.elasticsearch.xpack.core.common.notifications.Level;
11+
12+
import java.util.Date;
13+
14+
import static org.hamcrest.Matchers.equalTo;
15+
16+
public class DataFrameAnalyticsAuditMessageTests extends AbstractXContentTestCase<DataFrameAnalyticsAuditMessage> {
17+
18+
public void testGetJobType() {
19+
DataFrameAnalyticsAuditMessage message = createTestInstance();
20+
assertThat(message.getJobType(), equalTo("data_frame_analytics"));
21+
}
22+
23+
@Override
24+
protected DataFrameAnalyticsAuditMessage doParseInstance(XContentParser parser) {
25+
return DataFrameAnalyticsAuditMessage.PARSER.apply(parser, null);
26+
}
27+
28+
@Override
29+
protected boolean supportsUnknownFields() {
30+
return true;
31+
}
32+
33+
@Override
34+
protected DataFrameAnalyticsAuditMessage createTestInstance() {
35+
return new DataFrameAnalyticsAuditMessage(
36+
randomBoolean() ? null : randomAlphaOfLength(10),
37+
randomAlphaOfLengthBetween(1, 20),
38+
randomFrom(Level.values()),
39+
new Date(),
40+
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
41+
);
42+
}
43+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
3030
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
3131
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
32+
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
3233
import org.junit.After;
3334
import org.junit.Before;
3435

@@ -184,7 +185,8 @@ public void testDeleteExpiredData() throws Exception {
184185
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
185186
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
186187
.get().getHits().getTotalHits().value;
187-
long totalNotificationsCountBeforeDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
188+
long totalNotificationsCountBeforeDelete =
189+
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
188190
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
189191
assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L));
190192

@@ -234,7 +236,8 @@ public void testDeleteExpiredData() throws Exception {
234236
long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
235237
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
236238
.get().getHits().getTotalHits().value;
237-
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
239+
long totalNotificationsCountAfterDelete =
240+
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
238241
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
239242
assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete));
240243

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
2424
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
2525
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
26+
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
2627
import org.junit.After;
2728

2829
import java.io.IOException;
@@ -186,7 +187,8 @@ public void testScope() throws Exception {
186187

187188
// Wait until the notification that the filter was updated is indexed
188189
assertBusy(() -> {
189-
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
190+
SearchResponse searchResponse =
191+
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
190192
.setSize(1)
191193
.addSort("timestamp", SortOrder.DESC)
192194
.setQuery(QueryBuilders.boolQuery()

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
2222
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
2323
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
24+
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
2425
import org.junit.After;
2526

2627
import java.io.IOException;
@@ -223,7 +224,8 @@ public void testAddEventsToOpenJob() throws Exception {
223224

224225
// Wait until the notification that the process was updated is indexed
225226
assertBusy(() -> {
226-
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
227+
SearchResponse searchResponse =
228+
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
227229
.setSize(1)
228230
.addSort("timestamp", SortOrder.DESC)
229231
.setQuery(QueryBuilders.boolQuery()
@@ -298,7 +300,8 @@ public void testAddOpenedJobToGroupWithCalendar() throws Exception {
298300

299301
// Wait until the notification that the job was updated is indexed
300302
assertBusy(() -> {
301-
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
303+
SearchResponse searchResponse =
304+
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
302305
.setSize(1)
303306
.addSort("timestamp", SortOrder.DESC)
304307
.setQuery(QueryBuilders.boolQuery()

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@
216216
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
217217
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
218218
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
219+
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
219220
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
220221
import org.elasticsearch.xpack.ml.process.NativeController;
221222
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
@@ -470,6 +471,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
470471
}
471472

472473
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
474+
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
473475
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
474476
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
475477
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
@@ -592,6 +594,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
592594
jobDataCountsPersister,
593595
datafeedManager,
594596
anomalyDetectionAuditor,
597+
dataFrameAnalyticsAuditor,
595598
new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
596599
memoryTracker,
597600
analyticsProcessManager,
@@ -906,8 +909,12 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
906909

907910
public static boolean allTemplatesInstalled(ClusterState clusterState) {
908911
boolean allPresent = true;
909-
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
910-
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
912+
List<String> templateNames =
913+
Arrays.asList(
914+
AuditorField.NOTIFICATIONS_INDEX,
915+
MlMetaIndex.INDEX_NAME,
916+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
917+
AnomalyDetectorsIndex.jobResultsIndexPrefix());
911918
for (String templateName : templateNames) {
912919
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
913920
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.notifications;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
10+
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
11+
import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage;
12+
13+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
14+
15+
public class DataFrameAnalyticsAuditor extends AbstractAuditor<DataFrameAnalyticsAuditMessage> {
16+
17+
public DataFrameAnalyticsAuditor(Client client, String nodeName) {
18+
super(client, nodeName, AuditorField.NOTIFICATIONS_INDEX, ML_ORIGIN, DataFrameAnalyticsAuditMessage::new);
19+
}
20+
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AnnotationIndexIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void testCreatedWhenAfterOtherMlIndex() throws Exception {
5656
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "node_1");
5757
auditor.info("whatever", "blah");
5858

59-
// Creating a document in the .ml-notifications index should cause .ml-annotations
59+
// Creating a document in the .ml-notifications-000001 index should cause .ml-annotations
6060
// to be created, as it should get created as soon as any other ML index exists
6161

6262
assertBusy(() -> {

0 commit comments

Comments
 (0)