Skip to content

Commit e5ebcc1

Browse files
authored
[Transform] Fix a bug where destination index aliases are not set up for an unattended transform (elastic#105499) (elastic#105534)
1 parent a820a9e commit e5ebcc1

File tree

16 files changed

+266
-44
lines changed

16 files changed

+266
-44
lines changed

docs/changelog/105499.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 105499
2+
summary: Fix a bug where destination index aliases are not set up for an unattended transform
3+
area: Transform
4+
type: bug
5+
issues: []

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformChainIT.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ public class TransformChainIT extends TransformRestTestCase {
3838
},
3939
"dest": {
4040
"index": "%s",
41-
"pipeline": "%s"
41+
"pipeline": "%s",
42+
"aliases": [
43+
{
44+
"alias": "%s"
45+
},
46+
{
47+
"alias": "%s",
48+
"move_on_creation": true
49+
}
50+
]
4251
},
4352
"sync": {
4453
"time": {
@@ -172,9 +181,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
172181
// The number of documents is expected to be the same in all the indices.
173182
String sourceIndex = i == 0 ? reviewsIndexName : transformIds.get(i - 1) + "-dest";
174183
String destIndex = transformId + "-dest";
184+
String destReadAlias = destIndex + "-read";
185+
String destWriteAlias = destIndex + "-write";
175186
assertFalse(indexExists(destIndex));
187+
assertFalse(aliasExists(destReadAlias));
188+
assertFalse(aliasExists(destWriteAlias));
176189

177-
assertAcknowledged(putTransform(transformId, createTransformConfig(sourceIndex, destIndex), true, RequestOptions.DEFAULT));
190+
String transformConfig = createTransformConfig(sourceIndex, destIndex, destReadAlias, destWriteAlias);
191+
assertAcknowledged(putTransform(transformId, transformConfig, true, RequestOptions.DEFAULT));
178192
}
179193

180194
List<String> transformIdsShuffled = new ArrayList<>(transformIds);
@@ -198,6 +212,17 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
198212
}
199213
}, 60, TimeUnit.SECONDS);
200214

215+
for (String transformId : transformIds) {
216+
String destIndex = transformId + "-dest";
217+
String destReadAlias = destIndex + "-read";
218+
String destWriteAlias = destIndex + "-write";
219+
// Verify that the destination index is created.
220+
assertTrue(indexExists(destIndex));
221+
// Verify that the destination index aliases are set up.
222+
assertTrue(aliasExists(destReadAlias));
223+
assertTrue(aliasExists(destWriteAlias));
224+
}
225+
201226
// Stop all the transforms.
202227
for (String transformId : transformIds) {
203228
stopTransform(transformId);
@@ -208,12 +233,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
208233
}
209234
}
210235

211-
private static String createTransformConfig(String sourceIndex, String destIndex) {
236+
private static String createTransformConfig(String sourceIndex, String destIndex, String destReadAlias, String destWriteAlias) {
212237
return Strings.format(
213238
TRANSFORM_CONFIG_TEMPLATE,
214239
sourceIndex,
215240
destIndex,
216241
SET_INGEST_TIME_PIPELINE,
242+
destReadAlias,
243+
destWriteAlias,
217244
"1s",
218245
"1s",
219246
randomBoolean(),

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception {
420420

421421
startTransform(config.getId(), RequestOptions.DEFAULT);
422422

423-
// transform is red due to lacking permissions
424-
assertRed(transformId, authIssue);
423+
// Give the transform indexer enough time to try creating destination index
424+
Thread.sleep(5_000);
425+
426+
String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId);
427+
// transform's auth state status is still RED due to:
428+
// - lacking permissions
429+
// - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions)
430+
assertRed(transformId, authIssue, destIndexIssue);
425431

426432
// update transform's credentials so that the transform has permission to access source/dest indices
427433
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
@@ -576,6 +582,7 @@ private void assertGreen(String transformId) throws IOException {
576582
assertThat("Stats were: " + stats, extractValue(stats, "health", "issues"), is(nullValue()));
577583
}
578584

585+
// We expect exactly the issues passed as "expectedHealthIssueDetails". Not more, not less.
579586
@SuppressWarnings("unchecked")
580587
private void assertRed(String transformId, String... expectedHealthIssueDetails) throws IOException {
581588
Map<String, Object> stats = getBasicTransformStats(transformId);

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDestIndexIT.java

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.stream.Collectors;
2525

2626
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.hasEntry;
2728
import static org.hamcrest.Matchers.is;
2829

2930
public class TransformDestIndexIT extends TransformRestTestCase {
@@ -114,32 +115,26 @@ public void testTransformDestIndexAliases() throws Exception {
114115
assertAliases(destIndex2, destAliasAll, destAliasLatest);
115116
}
116117

117-
public void testTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception {
118-
testTransformDestIndexCreatedDuringUpdate(false);
118+
public void testUnattendedTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception {
119+
testUnattendedTransformDestIndexCreatedDuringUpdate(false);
119120
}
120121

121-
public void testTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception {
122-
testTransformDestIndexCreatedDuringUpdate(true);
122+
public void testUnattendedTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception {
123+
testUnattendedTransformDestIndexCreatedDuringUpdate(true);
123124
}
124125

125-
private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception {
126+
private void testUnattendedTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception {
126127
String transformId = "test_dest_index_on_update" + (deferValidation ? "-defer" : "");
127128
String destIndex = transformId + "-dest";
129+
String destAliasAll = transformId + ".all";
130+
String destAliasLatest = transformId + ".latest";
131+
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));
128132

133+
// Create and start the unattended transform
134+
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
135+
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME);
129136
assertFalse(indexExists(destIndex));
130137

131-
// Create and start the unattended transform
132-
createPivotReviewsTransform(
133-
transformId,
134-
destIndex,
135-
null,
136-
null,
137-
null,
138-
new SettingsConfig.Builder().setUnattended(true).build(),
139-
null,
140-
null,
141-
REVIEWS_INDEX_NAME
142-
);
143138
startTransform(transformId);
144139

145140
// Update the unattended transform. This will trigger destination index creation.
@@ -151,6 +146,66 @@ private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation)
151146

152147
// Verify that the destination index now exists
153148
assertTrue(indexExists(destIndex));
149+
// Verify that both aliases are configured on the dest index
150+
assertAliases(destIndex, destAliasAll, destAliasLatest);
151+
}
152+
153+
public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_NoDeferValidation() throws Exception {
154+
testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(false);
155+
}
156+
157+
public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_DeferValidation() throws Exception {
158+
testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(true);
159+
}
160+
161+
private void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(boolean deferValidation) throws Exception {
162+
String transformId = "test_dest_index_on_update-empty" + (deferValidation ? "-defer" : "");
163+
String sourceIndexIndex = transformId + "-src";
164+
String destIndex = transformId + "-dest";
165+
String destAliasAll = transformId + ".all";
166+
String destAliasLatest = transformId + ".latest";
167+
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));
168+
169+
// We want to use an empty source index to make sure transform will not write to the destination index
170+
putReviewsIndex(sourceIndexIndex, "date", false);
171+
assertFalse(indexExists(destIndex));
172+
173+
// Create and start the unattended transform
174+
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
175+
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, sourceIndexIndex);
176+
startTransform(transformId);
177+
178+
// Verify that the destination index creation got skipped
179+
assertFalse(indexExists(destIndex));
180+
181+
// Update the unattended transform. This will trigger destination index creation.
182+
// The update has to change something in the config (here, max_page_search_size). Otherwise it would have been optimized away.
183+
updateTransform(transformId, """
184+
{ "settings": { "max_page_search_size": 123 } }""", deferValidation);
185+
186+
// Verify that the destination index now exists
187+
assertTrue(indexExists(destIndex));
188+
// Verify that both aliases are configured on the dest index
189+
assertAliases(destIndex, destAliasAll, destAliasLatest);
190+
}
191+
192+
public void testUnattendedTransformDestIndexCreatedByIndexer() throws Exception {
193+
String transformId = "test_dest_index_in_indexer";
194+
String destIndex = transformId + "-dest";
195+
String destAliasAll = transformId + ".all";
196+
String destAliasLatest = transformId + ".latest";
197+
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));
198+
199+
// Create and start the unattended transform
200+
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
201+
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME);
202+
startTransform(transformId);
203+
waitForTransformCheckpoint(transformId, 1);
204+
205+
// Verify that the destination index exists
206+
assertTrue(indexExists(destIndex));
207+
// Verify that both aliases are configured on the dest index
208+
assertAliases(destIndex, destAliasAll, destAliasLatest);
154209
}
155210

156211
public void testTransformDestIndexMappings_DeduceMappings() throws Exception {
@@ -245,20 +300,9 @@ private void testTransformDestIndexMappings(String transformId, boolean deduceMa
245300
assertTrue(indexExists(destIndex));
246301
assertThat(
247302
getIndexMappingAsMap(destIndex),
248-
is(
249-
equalTo(
250-
Map.of(
251-
"properties",
252-
Map.of(
253-
"avg_rating",
254-
Map.of("type", "double"),
255-
"reviewer",
256-
Map.of("type", "keyword"),
257-
"timestamp",
258-
Map.of("type", "date")
259-
)
260-
)
261-
)
303+
hasEntry(
304+
"properties",
305+
Map.of("avg_rating", Map.of("type", "double"), "reviewer", Map.of("type", "keyword"), "timestamp", Map.of("type", "date"))
262306
)
263307
);
264308
Map<String, Object> searchResult = getAsMap(destIndex + "/_search?q=reviewer:user_0");

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
283283
threadPool,
284284
clusterService,
285285
settingsModule.getSettings(),
286-
getTransformExtension().getTransformInternalIndexAdditionalSettings(),
286+
getTransformExtension(),
287287
expressionResolver
288288
)
289289
);

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import org.elasticsearch.action.search.TransportSearchAction;
2828
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2929
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
30+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31+
import org.elasticsearch.cluster.service.ClusterService;
3032
import org.elasticsearch.common.logging.LoggerMessageFormat;
33+
import org.elasticsearch.common.settings.Settings;
3134
import org.elasticsearch.core.Nullable;
3235
import org.elasticsearch.core.TimeValue;
3336
import org.elasticsearch.core.Tuple;
@@ -54,9 +57,11 @@
5457
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
5558
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
5659
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
60+
import org.elasticsearch.xpack.transform.TransformExtension;
5761
import org.elasticsearch.xpack.transform.TransformServices;
5862
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
5963
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
64+
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
6065
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
6166
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
6267

@@ -75,6 +80,9 @@ class ClientTransformIndexer extends TransformIndexer {
7580
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);
7681

7782
private final ParentTaskAssigningClient client;
83+
private final ClusterService clusterService;
84+
private final IndexNameExpressionResolver indexNameExpressionResolver;
85+
private final Settings destIndexSettings;
7886
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
7987

8088
private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndexHolder;
@@ -84,6 +92,9 @@ class ClientTransformIndexer extends TransformIndexer {
8492

8593
ClientTransformIndexer(
8694
ThreadPool threadPool,
95+
ClusterService clusterService,
96+
IndexNameExpressionResolver indexNameExpressionResolver,
97+
TransformExtension transformExtension,
8798
TransformServices transformServices,
8899
CheckpointProvider checkpointProvider,
89100
AtomicReference<IndexerState> initialState,
@@ -112,6 +123,9 @@ class ClientTransformIndexer extends TransformIndexer {
112123
context
113124
);
114125
this.client = ExceptionsHelper.requireNonNull(client, "client");
126+
this.clusterService = clusterService;
127+
this.indexNameExpressionResolver = indexNameExpressionResolver;
128+
this.destIndexSettings = transformExtension.getTransformDestinationIndexSettings();
115129
this.seqNoPrimaryTermAndIndexHolder = new AtomicReference<>(seqNoPrimaryTermAndIndex);
116130

117131
// TODO: move into context constructor
@@ -288,6 +302,20 @@ void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListene
288302
SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
289303
}
290304

305+
@Override
306+
void doMaybeCreateDestIndex(Map<String, String> deducedDestIndexMappings, ActionListener<Boolean> listener) {
307+
TransformIndex.createDestinationIndex(
308+
client,
309+
auditor,
310+
indexNameExpressionResolver,
311+
clusterService.state(),
312+
transformConfig,
313+
destIndexSettings,
314+
deducedDestIndexMappings,
315+
listener
316+
);
317+
}
318+
291319
void validate(ActionListener<ValidateTransformAction.Response> listener) {
292320
ClientHelper.executeAsyncWithOrigin(
293321
client,

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
package org.elasticsearch.xpack.transform.transforms;
99

1010
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
11+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.elasticsearch.cluster.service.ClusterService;
1113
import org.elasticsearch.threadpool.ThreadPool;
1214
import org.elasticsearch.xpack.core.indexing.IndexerState;
1315
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
1416
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
1517
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
1618
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
1719
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
20+
import org.elasticsearch.xpack.transform.TransformExtension;
1821
import org.elasticsearch.xpack.transform.TransformServices;
1922
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
2023
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
@@ -23,6 +26,9 @@
2326

2427
class ClientTransformIndexerBuilder {
2528
private ParentTaskAssigningClient parentTaskClient;
29+
private ClusterService clusterService;
30+
private IndexNameExpressionResolver indexNameExpressionResolver;
31+
private TransformExtension transformExtension;
2632
private TransformServices transformServices;
2733
private TransformConfig transformConfig;
2834
private TransformIndexerStats initialStats;
@@ -44,6 +50,9 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
4450

4551
return new ClientTransformIndexer(
4652
threadPool,
53+
clusterService,
54+
indexNameExpressionResolver,
55+
transformExtension,
4756
transformServices,
4857
checkpointProvider,
4958
new AtomicReference<>(this.indexerState),
@@ -73,6 +82,21 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie
7382
return this;
7483
}
7584

85+
ClientTransformIndexerBuilder setIndexNameExpressionResolver(IndexNameExpressionResolver indexNameExpressionResolver) {
86+
this.indexNameExpressionResolver = indexNameExpressionResolver;
87+
return this;
88+
}
89+
90+
ClientTransformIndexerBuilder setClusterService(ClusterService clusterService) {
91+
this.clusterService = clusterService;
92+
return this;
93+
}
94+
95+
ClientTransformIndexerBuilder setTransformExtension(TransformExtension transformExtension) {
96+
this.transformExtension = transformExtension;
97+
return this;
98+
}
99+
76100
ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) {
77101
this.transformServices = transformServices;
78102
return this;

0 commit comments

Comments
 (0)