Skip to content

Commit c3e4405

Browse files
author
Hendrik Muhs
authored
[7.x][Transform] Transform fix force stop race condition (#49249) (#49420)
fix force stopping transform if indexer state hasn't been written and/or is set to STOPPED. In certain situations the transform could not be stopped, which means the task could not be removed. Introduces improved abstraction in order to better test state handling in future.
1 parent 010c3de commit c3e4405

File tree

43 files changed

+3601
-2557
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3601
-2557
lines changed

x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java

Lines changed: 67 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,8 @@ protected StopTransformResponse stopTransform(String id) throws IOException {
8989
return stopTransform(id, true, null, false);
9090
}
9191

92-
protected StopTransformResponse stopTransform(String id,
93-
boolean waitForCompletion,
94-
TimeValue timeout,
95-
boolean waitForCheckpoint) throws IOException {
92+
protected StopTransformResponse stopTransform(String id, boolean waitForCompletion, TimeValue timeout, boolean waitForCheckpoint)
93+
throws IOException {
9694
RestHighLevelClient restClient = new TestRestHighLevelClient();
9795
return restClient.transform()
9896
.stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT);
@@ -105,8 +103,7 @@ protected StartTransformResponse startTransform(String id, RequestOptions option
105103

106104
protected AcknowledgedResponse deleteTransform(String id) throws IOException {
107105
RestHighLevelClient restClient = new TestRestHighLevelClient();
108-
AcknowledgedResponse response =
109-
restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT);
106+
AcknowledgedResponse response = restClient.transform().deleteTransform(new DeleteTransformRequest(id), RequestOptions.DEFAULT);
110107
if (response.isAcknowledged()) {
111108
transformConfigs.remove(id);
112109
}
@@ -118,8 +115,7 @@ protected AcknowledgedResponse putTransform(TransformConfig config, RequestOptio
118115
throw new IllegalArgumentException("transform [" + config.getId() + "] is already registered");
119116
}
120117
RestHighLevelClient restClient = new TestRestHighLevelClient();
121-
AcknowledgedResponse response =
122-
restClient.transform().putTransform(new PutTransformRequest(config), options);
118+
AcknowledgedResponse response = restClient.transform().putTransform(new PutTransformRequest(config), options);
123119
if (response.isAcknowledged()) {
124120
transformConfigs.put(config.getId(), config);
125121
}
@@ -141,30 +137,33 @@ protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception
141137
}
142138

143139
protected void waitUntilCheckpoint(String id, long checkpoint, TimeValue waitTime) throws Exception {
144-
assertBusy(() ->
145-
assertEquals(checkpoint, getTransformStats(id)
146-
.getTransformsStats()
147-
.get(0)
148-
.getCheckpointingInfo()
149-
.getLast()
150-
.getCheckpoint()),
140+
assertBusy(
141+
() -> assertEquals(
142+
checkpoint,
143+
getTransformStats(id).getTransformsStats().get(0).getCheckpointingInfo().getLast().getCheckpoint()
144+
),
151145
waitTime.getMillis(),
152-
TimeUnit.MILLISECONDS);
146+
TimeUnit.MILLISECONDS
147+
);
153148
}
154149

155-
protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval(String field,
156-
DateHistogramInterval interval,
157-
ZoneId zone) {
150+
protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval(
151+
String field,
152+
DateHistogramInterval interval,
153+
ZoneId zone
154+
) {
158155
DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder()
159156
.setField(field)
160157
.setInterval(new DateHistogramGroupSource.FixedInterval(interval))
161158
.setTimeZone(zone);
162159
return builder.build();
163160
}
164161

165-
protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval(String field,
166-
DateHistogramInterval interval,
167-
ZoneId zone) {
162+
protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval(
163+
String field,
164+
DateHistogramInterval interval,
165+
ZoneId zone
166+
) {
168167
DateHistogramGroupSource.Builder builder = DateHistogramGroupSource.builder()
169168
.setField(field)
170169
.setInterval(new DateHistogramGroupSource.CalendarInterval(interval))
@@ -188,35 +187,38 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat
188187
return new AggregationConfig(aggregations);
189188
}
190189

191-
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
192-
AggregatorFactories.Builder aggregations) throws Exception {
190+
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups, AggregatorFactories.Builder aggregations)
191+
throws Exception {
193192
return createPivotConfig(groups, aggregations, null);
194193
}
195194

196-
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
197-
AggregatorFactories.Builder aggregations,
198-
Integer size) throws Exception {
195+
protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups, AggregatorFactories.Builder aggregations, Integer size)
196+
throws Exception {
199197
PivotConfig.Builder builder = PivotConfig.builder()
200198
.setGroups(createGroupConfig(groups))
201199
.setAggregationConfig(createAggConfig(aggregations))
202200
.setMaxPageSearchSize(size);
203201
return builder.build();
204202
}
205203

206-
protected TransformConfig createTransformConfig(String id,
207-
Map<String, SingleGroupSource> groups,
208-
AggregatorFactories.Builder aggregations,
209-
String destinationIndex,
210-
String... sourceIndices) throws Exception {
204+
protected TransformConfig createTransformConfig(
205+
String id,
206+
Map<String, SingleGroupSource> groups,
207+
AggregatorFactories.Builder aggregations,
208+
String destinationIndex,
209+
String... sourceIndices
210+
) throws Exception {
211211
return createTransformConfig(id, groups, aggregations, destinationIndex, QueryBuilders.matchAllQuery(), sourceIndices);
212212
}
213213

214-
protected TransformConfig.Builder createTransformConfigBuilder(String id,
215-
Map<String, SingleGroupSource> groups,
216-
AggregatorFactories.Builder aggregations,
217-
String destinationIndex,
218-
QueryBuilder queryBuilder,
219-
String... sourceIndices) throws Exception {
214+
protected TransformConfig.Builder createTransformConfigBuilder(
215+
String id,
216+
Map<String, SingleGroupSource> groups,
217+
AggregatorFactories.Builder aggregations,
218+
String destinationIndex,
219+
QueryBuilder queryBuilder,
220+
String... sourceIndices
221+
) throws Exception {
220222
return TransformConfig.builder()
221223
.setId(id)
222224
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
@@ -226,12 +228,14 @@ protected TransformConfig.Builder createTransformConfigBuilder(String id,
226228
.setDescription("Test transform config id: " + id);
227229
}
228230

229-
protected TransformConfig createTransformConfig(String id,
230-
Map<String, SingleGroupSource> groups,
231-
AggregatorFactories.Builder aggregations,
232-
String destinationIndex,
233-
QueryBuilder queryBuilder,
234-
String... sourceIndices) throws Exception {
231+
protected TransformConfig createTransformConfig(
232+
String id,
233+
Map<String, SingleGroupSource> groups,
234+
AggregatorFactories.Builder aggregations,
235+
String destinationIndex,
236+
QueryBuilder queryBuilder,
237+
String... sourceIndices
238+
) throws Exception {
235239
return createTransformConfigBuilder(id, groups, aggregations, destinationIndex, queryBuilder, sourceIndices).build();
236240
}
237241

@@ -272,8 +276,8 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio
272276
.endObject();
273277
}
274278
builder.endObject();
275-
CreateIndexResponse response =
276-
restClient.indices().create(new CreateIndexRequest(indexName).mapping(builder), RequestOptions.DEFAULT);
279+
CreateIndexResponse response = restClient.indices()
280+
.create(new CreateIndexRequest(indexName).mapping(builder), RequestOptions.DEFAULT);
277281
assertThat(response.isAcknowledged(), is(true));
278282
}
279283

@@ -320,10 +324,14 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio
320324

321325
protected Map<String, Object> toLazy(ToXContent parsedObject) throws Exception {
322326
BytesReference bytes = XContentHelper.toXContent(parsedObject, XContentType.JSON, false);
323-
try(XContentParser parser = XContentHelper.createParser(xContentRegistry(),
324-
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
325-
bytes,
326-
XContentType.JSON)) {
327+
try (
328+
XContentParser parser = XContentHelper.createParser(
329+
xContentRegistry(),
330+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
331+
bytes,
332+
XContentType.JSON
333+
)
334+
) {
327335
return parser.mapOrdered();
328336
}
329337
}
@@ -349,16 +357,18 @@ protected NamedXContentRegistry xContentRegistry() {
349357

350358
@Override
351359
protected Settings restClientSettings() {
352-
final String token = "Basic " +
353-
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
354-
return Settings.builder()
355-
.put(ThreadContext.PREFIX + ".Authorization", token)
356-
.build();
360+
final String token = "Basic "
361+
+ Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
362+
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
357363
}
358364

359365
protected static class TestRestHighLevelClient extends RestHighLevelClient {
360-
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES =
361-
new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedXContents();
366+
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES = new SearchModule(
367+
Settings.EMPTY,
368+
false,
369+
Collections.emptyList()
370+
).getNamedXContents();
371+
362372
TestRestHighLevelClient() {
363373
super(client(), restClient -> {}, X_CONTENT_ENTRIES);
364374
}

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,11 @@
3838
import static org.hamcrest.Matchers.equalTo;
3939
import static org.hamcrest.Matchers.is;
4040

41-
4241
public class TransformInternalIndexIT extends ESRestTestCase {
4342

44-
4543
private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME;
4644
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";
4745

48-
4946
public void testUpdateDeletesOldTransformConfig() throws Exception {
5047
TestRestHighLevelClient client = new TestRestHighLevelClient();
5148
// The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
@@ -63,8 +60,12 @@ public void testUpdateDeletesOldTransformConfig() throws Exception {
6360
createSourceIndex(transformIndex);
6461
String transformId = "transform-update-deletes-old-transform-config";
6562
String config = "{\"dest\": {\"index\":\"bar\"},"
66-
+ " \"source\": {\"index\":\"" + transformIndex + "\", \"query\": {\"match_all\":{}}},"
67-
+ " \"id\": \""+transformId+"\","
63+
+ " \"source\": {\"index\":\""
64+
+ transformIndex
65+
+ "\", \"query\": {\"match_all\":{}}},"
66+
+ " \"id\": \""
67+
+ transformId
68+
+ "\","
6869
+ " \"doc_type\": \"data_frame_transform_config\","
6970
+ " \"pivot\": {"
7071
+ " \"group_by\": {"
@@ -79,22 +80,23 @@ public void testUpdateDeletesOldTransformConfig() throws Exception {
7980
+ " } } } },"
8081
+ "\"frequency\":\"1s\""
8182
+ "}";
82-
client.index(new IndexRequest(OLD_INDEX)
83-
.id(TransformConfig.documentId(transformId))
83+
client.index(
84+
new IndexRequest(OLD_INDEX).id(TransformConfig.documentId(transformId))
8485
.source(config, XContentType.JSON)
8586
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
86-
RequestOptions.DEFAULT);
87-
GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, TransformConfig.documentId(transformId)),
88-
RequestOptions.DEFAULT);
87+
RequestOptions.DEFAULT
88+
);
89+
GetResponse getResponse = client.get(new GetRequest(OLD_INDEX, TransformConfig.documentId(transformId)), RequestOptions.DEFAULT);
8990
assertThat(getResponse.isExists(), is(true));
9091

91-
GetTransformResponse response = client.transform()
92-
.getTransform(new GetTransformRequest(transformId), RequestOptions.DEFAULT);
92+
GetTransformResponse response = client.transform().getTransform(new GetTransformRequest(transformId), RequestOptions.DEFAULT);
9393
assertThat(response.getTransformConfigurations().get(0).getId(), equalTo(transformId));
9494

95-
UpdateTransformResponse updated = client.transform().updateTransform(
96-
new UpdateTransformRequest(TransformConfigUpdate.builder().setDescription("updated").build(), transformId),
97-
RequestOptions.DEFAULT);
95+
UpdateTransformResponse updated = client.transform()
96+
.updateTransform(
97+
new UpdateTransformRequest(TransformConfigUpdate.builder().setDescription("updated").build(), transformId),
98+
RequestOptions.DEFAULT
99+
);
98100

99101
assertThat(updated.getTransformConfiguration().getId(), equalTo(transformId));
100102
assertThat(updated.getTransformConfiguration().getDescription(), equalTo("updated"));
@@ -104,19 +106,15 @@ public void testUpdateDeletesOldTransformConfig() throws Exception {
104106
assertThat(getResponse.isExists(), is(false));
105107

106108
// New should be here
107-
getResponse = client.get(new GetRequest(CURRENT_INDEX, TransformConfig.documentId(transformId)),
108-
RequestOptions.DEFAULT);
109+
getResponse = client.get(new GetRequest(CURRENT_INDEX, TransformConfig.documentId(transformId)), RequestOptions.DEFAULT);
109110
assertThat(getResponse.isExists(), is(true));
110111
}
111112

112-
113113
@Override
114114
protected Settings restClientSettings() {
115-
final String token = "Basic " +
116-
Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
117-
return Settings.builder()
118-
.put(ThreadContext.PREFIX + ".Authorization", token)
119-
.build();
115+
final String token = "Basic "
116+
+ Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
117+
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
120118
}
121119

122120
private void createSourceIndex(String index) throws IOException {

0 commit comments

Comments
 (0)