Skip to content

Commit ad71547

Browse files
authored
Add configurable op_type for index watcher action (#64590)
1 parent 9543d3b commit ad71547

File tree

5 files changed

+99
-21
lines changed

5 files changed

+99
-21
lines changed

x-pack/docs/en/watcher/actions/index.asciidoc

+7-3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition:
2626
<1> The id of the action
2727
<2> An optional <<condition,condition>> to restrict action execution
2828
<3> An optional <<transform,transform>> to transform the payload and prepare the data that should be indexed
29-
<4> The elasticsearch index to store the data to
30-
<5> An optional `_id` for the document, if it should always be the same document.
29+
<4> The index, alias, or data stream to which the data will be written
30+
<5> An optional `_id` for the document
3131

3232

3333
[[index-action-attributes]]
@@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition:
3737
|======
3838
|Name |Required | Default | Description
3939

40-
| `index` | yes | - | The Elasticsearch index to index into.
40+
| `index` | yes | - | The index, alias, or data stream to index into.
4141

4242

4343
| `doc_id` | no | - | The optional `_id` of the document.
4444

45+
| `op_type` | no | `index` | The <<docs-index-api-op_type,op_type>> for the index operation.
46+
Must be one of either `index` or `create`. Must be `create` if
47+
`index` is a data stream.
48+
4549
| `execution_time_field` | no | - | The field that will store/index the watch execution
4650
time.
4751

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java

+6
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public Action.Result execute(String actionId, WatchExecutionContext ctx, Payload
8383

8484
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
8585
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
86+
if (action.opType != null) {
87+
indexRequest.opType(action.opType);
88+
}
8689

8790
data = addTimestampToDocument(data, ctx.executionTime());
8891
BytesReference bytesReference;
@@ -128,6 +131,9 @@ Action.Result indexBulk(Iterable list, String actionId, WatchExecutionContext ct
128131
IndexRequest indexRequest = new IndexRequest();
129132
indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
130133
indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
134+
if (action.opType != null) {
135+
indexRequest.opType(action.opType);
136+
}
131137

132138
doc = addTimestampToDocument(doc, ctx.executionTime());
133139
try (XContentBuilder builder = jsonBuilder()) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java

+39-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.watcher.actions.index;
77

88
import org.elasticsearch.ElasticsearchParseException;
9+
import org.elasticsearch.action.DocWriteRequest;
910
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
1011
import org.elasticsearch.common.Nullable;
1112
import org.elasticsearch.common.ParseField;
@@ -19,6 +20,7 @@
1920

2021
import java.io.IOException;
2122
import java.time.ZoneId;
23+
import java.util.List;
2224
import java.util.Objects;
2325

2426
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -29,16 +31,18 @@ public class IndexAction implements Action {
2931

3032
@Nullable final String index;
3133
@Nullable final String docId;
34+
@Nullable final DocWriteRequest.OpType opType;
3235
@Nullable final String executionTimeField;
3336
@Nullable final TimeValue timeout;
3437
@Nullable final ZoneId dynamicNameTimeZone;
3538
@Nullable final RefreshPolicy refreshPolicy;
3639

37-
public IndexAction(@Nullable String index, @Nullable String docId,
38-
@Nullable String executionTimeField,
39-
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
40+
public IndexAction(@Nullable String index, @Nullable String docId, @Nullable DocWriteRequest.OpType opType,
41+
@Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone,
42+
@Nullable RefreshPolicy refreshPolicy) {
4043
this.index = index;
4144
this.docId = docId;
45+
this.opType = opType;
4246
this.executionTimeField = executionTimeField;
4347
this.timeout = timeout;
4448
this.dynamicNameTimeZone = dynamicNameTimeZone;
@@ -58,6 +62,10 @@ public String getDocId() {
5862
return docId;
5963
}
6064

65+
public DocWriteRequest.OpType getOpType() {
66+
return opType;
67+
}
68+
6169
public String getExecutionTimeField() {
6270
return executionTimeField;
6371
}
@@ -77,7 +85,9 @@ public boolean equals(Object o) {
7785

7886
IndexAction that = (IndexAction) o;
7987

80-
return Objects.equals(index, that.index) && Objects.equals(docId, that.docId)
88+
return Objects.equals(index, that.index)
89+
&& Objects.equals(docId, that.docId)
90+
&& Objects.equals(opType, that.opType)
8191
&& Objects.equals(executionTimeField, that.executionTimeField)
8292
&& Objects.equals(timeout, that.timeout)
8393
&& Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
@@ -86,7 +96,7 @@ public boolean equals(Object o) {
8696

8797
@Override
8898
public int hashCode() {
89-
return Objects.hash(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
99+
return Objects.hash(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
90100
}
91101

92102
@Override
@@ -98,6 +108,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
98108
if (docId != null) {
99109
builder.field(Field.DOC_ID.getPreferredName(), docId);
100110
}
111+
if (opType != null) {
112+
builder.field(Field.OP_TYPE.getPreferredName(), opType);
113+
}
101114
if (executionTimeField != null) {
102115
builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
103116
}
@@ -116,6 +129,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
116129
public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
117130
String index = null;
118131
String docId = null;
132+
DocWriteRequest.OpType opType = null;
119133
String executionTimeField = null;
120134
TimeValue timeout = null;
121135
ZoneId dynamicNameTimeZone = null;
@@ -143,6 +157,17 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
143157
} else if (token == XContentParser.Token.VALUE_STRING) {
144158
if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) {
145159
docId = parser.text();
160+
} else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
161+
try {
162+
opType = DocWriteRequest.OpType.fromString(parser.text());
163+
if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) {
164+
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " +
165+
"must be [index] or [create]", TYPE, watchId, actionId, currentFieldName);
166+
}
167+
} catch (IllegalArgumentException e) {
168+
throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " +
169+
"field [{}]", TYPE, watchId, actionId, currentFieldName);
170+
}
146171
} else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
147172
executionTimeField = parser.text();
148173
} else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -167,7 +192,7 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
167192
}
168193
}
169194

170-
return new IndexAction(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
195+
return new IndexAction(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
171196
}
172197

173198
public static Builder builder(String index) {
@@ -247,6 +272,7 @@ public static class Builder implements Action.Builder<IndexAction> {
247272

248273
final String index;
249274
String docId;
275+
DocWriteRequest.OpType opType;
250276
String executionTimeField;
251277
TimeValue timeout;
252278
ZoneId dynamicNameTimeZone;
@@ -261,6 +287,11 @@ public Builder setDocId(String docId) {
261287
return this;
262288
}
263289

290+
public Builder setOpType(DocWriteRequest.OpType opType) {
291+
this.opType = opType;
292+
return this;
293+
}
294+
264295
public Builder setExecutionTimeField(String executionTimeField) {
265296
this.executionTimeField = executionTimeField;
266297
return this;
@@ -283,13 +314,14 @@ public Builder setRefreshPolicy(RefreshPolicy refreshPolicy) {
283314

284315
@Override
285316
public IndexAction build() {
286-
return new IndexAction(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
317+
return new IndexAction(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
287318
}
288319
}
289320

290321
interface Field {
291322
ParseField INDEX = new ParseField("index");
292323
ParseField DOC_ID = new ParseField("doc_id");
324+
ParseField OP_TYPE = new ParseField("op_type");
293325
ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
294326
ParseField SOURCE = new ParseField("source");
295327
ParseField RESPONSE = new ParseField("response");

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java

+43-9
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
5252
import static org.elasticsearch.common.util.set.Sets.newHashSet;
5353
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
54+
import static org.hamcrest.Matchers.containsString;
5455
import static org.hamcrest.Matchers.equalTo;
5556
import static org.hamcrest.Matchers.hasEntry;
5657
import static org.hamcrest.Matchers.hasSize;
@@ -89,6 +90,10 @@ public void testParser() throws Exception {
8990
if (writeTimeout != null) {
9091
builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
9192
}
93+
DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null;
94+
if (opType != null) {
95+
builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase());
96+
}
9297
builder.endObject();
9398
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
9499
XContentParser parser = createParser(builder);
@@ -102,6 +107,9 @@ public void testParser() throws Exception {
102107
if (timestampField != null) {
103108
assertThat(executable.action().executionTimeField, equalTo(timestampField));
104109
}
110+
if (opType != null) {
111+
assertThat(executable.action().opType, equalTo(opType));
112+
}
105113
assertThat(executable.action().timeout, equalTo(writeTimeout));
106114
}
107115

@@ -130,20 +138,47 @@ public void testParserFailure() throws Exception {
130138
.endObject());
131139
}
132140

141+
public void testOpTypeThatCannotBeParsed() throws Exception {
142+
expectParseFailure(jsonBuilder()
143+
.startObject()
144+
.field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10))
145+
.endObject(),
146+
"failed to parse op_type value for field [op_type]");
147+
}
148+
149+
public void testUnsupportedOpType() throws Exception {
150+
expectParseFailure(jsonBuilder()
151+
.startObject()
152+
.field(IndexAction.Field.OP_TYPE.getPreferredName(),
153+
randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name()))
154+
.endObject(),
155+
"op_type value for field [op_type] must be [index] or [create]");
156+
}
157+
158+
private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception {
159+
expectFailure(ElasticsearchParseException.class, builder, expectedMessage);
160+
}
161+
133162
private void expectParseFailure(XContentBuilder builder) throws Exception {
134163
expectFailure(ElasticsearchParseException.class, builder);
135164
}
136165

137166
private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
167+
expectFailure(clazz, builder, null);
168+
}
169+
170+
private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception {
138171
IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
139172
XContentParser parser = createParser(builder);
140173
parser.nextToken();
141-
expectThrows(clazz, () ->
142-
actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
174+
Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
175+
if (expectedMessage != null) {
176+
assertThat(t.getMessage(), containsString(expectedMessage));
177+
}
143178
}
144179

145180
public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
146-
final IndexAction action = new IndexAction("test-index", "123", null, null, null, refreshPolicy);
181+
final IndexAction action = new IndexAction("test-index", "123", null, null, null, null, refreshPolicy);
147182
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
148183
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
149184
final Map<String, Object> docWithId = Map.of(
@@ -191,7 +226,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
191226

192227
final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
193228
configureIdDynamically ? null : "my_id",
194-
null, null, null, refreshPolicy);
229+
null, null, null, null, refreshPolicy);
195230
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
196231
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
197232

@@ -211,7 +246,7 @@ public void testThatIndexTypeIdDynamically() throws Exception {
211246
}
212247

213248
public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
214-
final IndexAction action = new IndexAction(null, null, null, null, null, refreshPolicy);
249+
final IndexAction action = new IndexAction(null, null, null, null, null, null, refreshPolicy);
215250
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
216251
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
217252

@@ -239,7 +274,7 @@ public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() thro
239274
public void testConfigureIndexInMapAndAction() {
240275
String fieldName = "_index";
241276
final IndexAction action = new IndexAction("my_index",
242-
null,null, null, null, refreshPolicy);
277+
null, null,null, null, null, refreshPolicy);
243278
final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
244279
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
245280

@@ -258,8 +293,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
258293
String docId = randomAlphaOfLength(5);
259294
String timestampField = randomFrom("@timestamp", null);
260295

261-
IndexAction action = new IndexAction("test-index", docIdAsParam ? docId : null, timestampField, null, null,
262-
refreshPolicy);
296+
IndexAction action = new IndexAction("test-index", docIdAsParam ? docId : null, null, timestampField, null, null, refreshPolicy);
263297
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
264298
TimeValue.timeValueSeconds(30));
265299
ZonedDateTime executionTime = DateUtils.nowWithMillisResolution();
@@ -308,7 +342,7 @@ public void testIndexActionExecuteSingleDoc() throws Exception {
308342
}
309343

310344
public void testFailureResult() throws Exception {
311-
IndexAction action = new IndexAction("test-index", null, "@timestamp", null, null, refreshPolicy);
345+
IndexAction action = new IndexAction("test-index", null, null, "@timestamp", null, null, refreshPolicy);
312346
ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
313347
TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
314348

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ElasticsearchParseException;
11+
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.action.search.SearchRequest;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.client.Client;
@@ -592,8 +593,9 @@ private List<ActionWrapper> randomActions() {
592593
ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
593594
TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
594595
WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
595-
IndexAction action = new IndexAction("_index", randomBoolean() ? "123" : null, null, timeout, timeZone,
596-
refreshPolicy);
596+
IndexAction action = new IndexAction("_index", randomBoolean() ? "123" : null,
597+
randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone,
598+
refreshPolicy);
597599
list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
598600
AlwaysConditionTests.randomCondition(scriptService), randomTransform(),
599601
new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),

0 commit comments

Comments
 (0)