Skip to content

Commit 0f61ffe

Browse files
author
Hendrik Muhs
committed
[Transform] Improve force stop robustness in case of an error (elastic#51072)
If a transform config got lost (e.g. because the internal index disappeared) tasks could not be stopped using transform API. This change makes it possible to stop transforms without a config, meaning to remove the background task. In order to do so force must be set to true.
1 parent a0bc9ee commit 0f61ffe

File tree

6 files changed

+238
-78
lines changed

6 files changed

+238
-78
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class TransformMessages {
1717
"Interrupted while waiting for transform [{0}] to stop";
1818
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
1919
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
20+
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
21+
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
2022
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION =
2123
"Failed to validate configuration";
2224
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

-8
Original file line numberDiff line numberDiff line change
@@ -918,12 +918,4 @@ public void testContinuousStopWaitForCheckpoint() throws Exception {
918918
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
919919
deleteIndex(indexName);
920920
}
921-
922-
private void assertOnePivotValue(String query, double expected) throws IOException {
923-
Map<String, Object> searchResult = getAsMap(query);
924-
925-
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
926-
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
927-
assertEquals(expected, actual, 0.000001);
928-
}
929921
}

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

+52-50
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
7373
}
7474

7575
protected void createReviewsIndex(String indexName, int numDocs) throws IOException {
76-
int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1};
76+
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };
7777

7878
// create mapping
7979
try (XContentBuilder builder = jsonBuilder()) {
@@ -158,6 +158,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws IOExcept
158158
bulkRequest.setJsonEntity(bulk.toString());
159159
client().performRequest(bulkRequest);
160160
}
161+
161162
/**
162163
* Create a simple dataset for testing with reviewers, ratings and businesses
163164
*/
@@ -182,9 +183,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
182183

183184
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
184185

185-
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
186-
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
187-
//Set frequency high for testing
186+
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
187+
// Set frequency high for testing
188188
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
189189
+ " \"frequency\": \"1s\","
190190
+ " \"pivot\": {"
@@ -206,7 +206,6 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
206206
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
207207
}
208208

209-
210209
protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
211210
throws IOException {
212211
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
@@ -226,30 +225,30 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
226225
}
227226

228227
config += " \"pivot\": {"
229-
+ " \"group_by\": {"
230-
+ " \"reviewer\": {"
231-
+ " \"terms\": {"
232-
+ " \"field\": \"user_id\""
233-
+ " } } },"
234-
+ " \"aggregations\": {"
235-
+ " \"avg_rating\": {"
236-
+ " \"avg\": {"
237-
+ " \"field\": \"stars\""
238-
+ " } } } },"
239-
+ "\"frequency\":\"1s\""
240-
+ "}";
228+
+ " \"group_by\": {"
229+
+ " \"reviewer\": {"
230+
+ " \"terms\": {"
231+
+ " \"field\": \"user_id\""
232+
+ " } } },"
233+
+ " \"aggregations\": {"
234+
+ " \"avg_rating\": {"
235+
+ " \"avg\": {"
236+
+ " \"field\": \"stars\""
237+
+ " } } } },"
238+
+ "\"frequency\":\"1s\""
239+
+ "}";
241240

242241
createDataframeTransformRequest.setJsonEntity(config);
243242

244243
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
245244
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
246245
}
247246

248-
protected void startDataframeTransform(String transformId) throws IOException {
249-
startDataframeTransform(transformId, null);
247+
protected void startTransform(String transformId) throws IOException {
248+
startTransform(transformId, null);
250249
}
251250

252-
protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
251+
protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
253252
// start the transform
254253
final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader);
255254
if (warnings.length > 0) {
@@ -280,10 +279,10 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
280279
startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]);
281280
}
282281

283-
protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
284-
String authHeader, String... warnings) throws Exception {
282+
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings)
283+
throws Exception {
285284
// start the transform
286-
startDataframeTransform(transformId, authHeader, warnings);
285+
startTransform(transformId, authHeader, warnings);
287286
assertTrue(indexExists(dataFrameIndex));
288287
// wait until the dataframe has been created and all data is available
289288
waitForDataFrameCheckpoint(transformId);
@@ -292,18 +291,14 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
292291
refreshIndex(dataFrameIndex);
293292
}
294293

295-
protected void startAndWaitForContinuousTransform(String transformId,
296-
String dataFrameIndex,
297-
String authHeader) throws Exception {
294+
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
298295
startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L);
299296
}
300297

301-
protected void startAndWaitForContinuousTransform(String transformId,
302-
String dataFrameIndex,
303-
String authHeader,
304-
long checkpoint) throws Exception {
298+
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint)
299+
throws Exception {
305300
// start the transform
306-
startDataframeTransform(transformId, authHeader, new String[0]);
301+
startTransform(transformId, authHeader, new String[0]);
307302
assertTrue(indexExists(dataFrameIndex));
308303
// wait until the dataframe has been created and all data is available
309304
waitForTransformCheckpoint(transformId, checkpoint);
@@ -323,9 +318,7 @@ protected Request createRequestWithAuth(final String method, final String endpoi
323318
}
324319

325320
void waitForDataFrameStopped(String transformId) throws Exception {
326-
assertBusy(() -> {
327-
assertEquals("stopped", getDataFrameTransformState(transformId));
328-
}, 15, TimeUnit.SECONDS);
321+
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
329322
}
330323

331324
void waitForDataFrameCheckpoint(String transformId) throws Exception {
@@ -341,20 +334,20 @@ void refreshIndex(String index) throws IOException {
341334
}
342335

343336
@SuppressWarnings("unchecked")
344-
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
337+
protected static List<Map<String, Object>> getTransforms() throws IOException {
345338
Response response = adminClient().performRequest(new Request("GET", getTransformEndpoint() + "_all"));
346339
Map<String, Object> transforms = entityAsMap(response);
347340
List<Map<String, Object>> transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms);
348341

349342
return transformConfigs == null ? Collections.emptyList() : transformConfigs;
350343
}
351344

352-
protected static String getDataFrameTransformState(String transformId) throws IOException {
353-
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
345+
protected static String getTransformState(String transformId) throws IOException {
346+
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
354347
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
355348
}
356349

357-
protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
350+
protected static Map<?, ?> getTransformStateAndStats(String transformId) throws IOException {
358351
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
359352
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
360353
if (transforms.isEmpty()) {
@@ -383,7 +376,7 @@ public static void removeIndices() throws Exception {
383376
}
384377

385378
public void wipeTransforms() throws IOException {
386-
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
379+
List<Map<String, Object>> transformConfigs = getTransforms();
387380
for (Map<String, Object> transformConfig : transformConfigs) {
388381
String transformId = (String) transformConfig.get("id");
389382
Request request = new Request("POST", getTransformEndpoint() + transformId + "/_stop");
@@ -395,7 +388,7 @@ public void wipeTransforms() throws IOException {
395388

396389
for (Map<String, Object> transformConfig : transformConfigs) {
397390
String transformId = (String) transformConfig.get("id");
398-
String state = getDataFrameTransformState(transformId);
391+
String state = getTransformState(transformId);
399392
assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state);
400393
}
401394

@@ -405,7 +398,7 @@ public void wipeTransforms() throws IOException {
405398
}
406399

407400
// transforms should be all gone
408-
transformConfigs = getDataFrameTransforms();
401+
transformConfigs = getTransforms();
409402
assertTrue(transformConfigs.isEmpty());
410403

411404
// the configuration index should be empty
@@ -437,11 +430,15 @@ static int getDataFrameCheckpoint(String transformId) throws IOException {
437430
protected void setupDataAccessRole(String role, String... indices) throws IOException {
438431
String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\""));
439432
Request request = new Request("PUT", "/_security/role/" + role);
440-
request.setJsonEntity("{"
441-
+ " \"indices\" : ["
442-
+ " { \"names\": [" + indicesStr + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
443-
+ " ]"
444-
+ "}");
433+
request.setJsonEntity(
434+
"{"
435+
+ " \"indices\" : ["
436+
+ " { \"names\": ["
437+
+ indicesStr
438+
+ "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
439+
+ " ]"
440+
+ "}"
441+
);
445442
client().performRequest(request);
446443
}
447444

@@ -450,13 +447,18 @@ protected void setupUser(String user, List<String> roles) throws IOException {
450447

451448
String rolesStr = roles.stream().collect(Collectors.joining("\",\"", "\"", "\""));
452449
Request request = new Request("PUT", "/_security/user/" + user);
453-
request.setJsonEntity("{"
454-
+ " \"password\" : \"" + password + "\","
455-
+ " \"roles\" : [ " + rolesStr + " ]"
456-
+ "}");
450+
request.setJsonEntity("{" + " \"password\" : \"" + password + "\"," + " \"roles\" : [ " + rolesStr + " ]" + "}");
457451
client().performRequest(request);
458452
}
459453

454+
protected void assertOnePivotValue(String query, double expected) throws IOException {
455+
Map<String, Object> searchResult = getAsMap(query);
456+
457+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
458+
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
459+
assertEquals(expected, actual, 0.000001);
460+
}
461+
460462
protected static String getTransformEndpoint() {
461463
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
462464
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.integration;
8+
9+
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.client.ResponseException;
11+
import org.elasticsearch.xpack.core.transform.TransformField;
12+
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
13+
14+
import java.io.IOException;
15+
import java.util.Map;
16+
import java.util.Map.Entry;
17+
18+
import static org.hamcrest.Matchers.containsString;
19+
import static org.hamcrest.Matchers.equalTo;
20+
21+
public class TransformRobustnessIT extends TransformRestTestCase {
22+
23+
public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
24+
String indexName = "continuous_reviews";
25+
createReviewsIndex(indexName);
26+
String transformId = "simple_continuous_pivot";
27+
String transformIndex = "pivot_reviews_continuous";
28+
final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
29+
String config = "{"
30+
+ " \"source\": {\"index\":\""
31+
+ indexName
32+
+ "\"},"
33+
+ " \"dest\": {\"index\":\""
34+
+ transformIndex
35+
+ "\"},"
36+
+ " \"frequency\": \"1s\","
37+
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
38+
+ " \"pivot\": {"
39+
+ " \"group_by\": {"
40+
+ " \"reviewer\": {"
41+
+ " \"terms\": {"
42+
+ " \"field\": \"user_id\""
43+
+ " } } },"
44+
+ " \"aggregations\": {"
45+
+ " \"avg_rating\": {"
46+
+ " \"avg\": {"
47+
+ " \"field\": \"stars\""
48+
+ " } } } }"
49+
+ "}";
50+
createTransformRequest.setJsonEntity(config);
51+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
52+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
53+
assertEquals(1, getTransforms().size());
54+
// there shouldn't be a task yet
55+
assertEquals(0, getNumberOfTransformTasks());
56+
startAndWaitForContinuousTransform(transformId, transformIndex, null);
57+
assertTrue(indexExists(transformIndex));
58+
59+
// a task exists
60+
assertEquals(1, getNumberOfTransformTasks());
61+
// get and check some users
62+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
63+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
64+
assertNotNull(getTransformState(transformId));
65+
66+
assertEquals(1, getTransforms().size());
67+
68+
// delete the transform index
69+
beEvilAndDeleteTheTransformIndex();
70+
// transform is gone
71+
assertEquals(0, getTransforms().size());
72+
// but the task is still there
73+
assertEquals(1, getNumberOfTransformTasks());
74+
75+
Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop");
76+
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest));
77+
78+
assertEquals(409, e.getResponse().getStatusLine().getStatusCode());
79+
assertThat(
80+
e.getMessage(),
81+
containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.")
82+
);
83+
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(true));
84+
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
85+
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
86+
87+
// the task is gone
88+
assertEquals(1, getNumberOfTransformTasks());
89+
}
90+
91+
@SuppressWarnings("unchecked")
92+
private int getNumberOfTransformTasks() throws IOException {
93+
final Request tasksRequest = new Request("GET", "/_tasks");
94+
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
95+
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
96+
97+
Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
98+
if (nodes == null) {
99+
return 0;
100+
}
101+
102+
int foundTasks = 0;
103+
for (Entry<String, Object> node : nodes.entrySet()) {
104+
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
105+
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
106+
foundTasks += tasks != null ? tasks.size() : 0;
107+
}
108+
109+
return foundTasks;
110+
}
111+
112+
private void beEvilAndDeleteTheTransformIndex() throws IOException {
113+
adminClient().performRequest(new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME));
114+
}
115+
}

0 commit comments

Comments
 (0)