Skip to content

Commit 6ff8b7b

Browse files
authored
Filter enrich policy index deletes to just the policy's associated indices (#82568)
1 parent 979e17a commit 6ff8b7b

File tree

5 files changed

+114
-21
lines changed

5 files changed

+114
-21
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,32 @@ public static String getBaseName(String policyName) {
169169
return ENRICH_INDEX_NAME_BASE + policyName;
170170
}
171171

172+
/**
173+
* Given a policy name and a timestamp, return the enrich index name that should be used.
174+
*
175+
* @param policyName the name of the policy
176+
* @param nowTimestamp the current time
177+
* @return an enrich index name
178+
*/
179+
public static String getIndexName(String policyName, long nowTimestamp) {
180+
Objects.nonNull(policyName);
181+
return EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
182+
}
183+
184+
/**
185+
* Tests whether the named policy is associated with the named index according to the naming
186+
* pattern that exists between policy names and index names.
187+
*
188+
* @param policyName the policy name
189+
* @param indexName the index name
190+
* @return true if and only if the named policy is associated with the named index
191+
*/
192+
public static boolean isPolicyForIndex(String policyName, String indexName) {
193+
Objects.nonNull(policyName);
194+
Objects.nonNull(indexName);
195+
return indexName.matches(EnrichPolicy.getBaseName(policyName) + "-" + "\\d+");
196+
}
197+
172198
@Override
173199
public void writeTo(StreamOutput out) throws IOException {
174200
out.writeString(type);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuild
364364

365365
private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
366366
long nowTimestamp = nowSupplier.getAsLong();
367-
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
367+
String enrichIndexName = EnrichPolicy.getIndexName(policyName, nowTimestamp);
368368
Settings enrichIndexSettings = Settings.builder()
369369
.put("index.number_of_shards", 1)
370370
.put("index.number_of_replicas", 0)

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import java.util.ArrayList;
3939
import java.util.List;
40+
import java.util.stream.Stream;
4041

4142
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;
4243

@@ -82,23 +83,24 @@ protected void masterOperation(
8283
ClusterState state,
8384
ActionListener<AcknowledgedResponse> listener
8485
) throws Exception {
85-
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
86+
final String policyName = request.getName();
87+
final EnrichPolicy policy = EnrichStore.getPolicy(policyName, state); // ensure the policy exists first
8688
if (policy == null) {
87-
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
89+
throw new ResourceNotFoundException("policy [{}] not found", policyName);
8890
}
8991

90-
enrichPolicyLocks.lockPolicy(request.getName());
92+
enrichPolicyLocks.lockPolicy(policyName);
9193
try {
92-
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
93-
List<String> pipelinesWithProcessors = new ArrayList<>();
94+
final List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
95+
final List<String> pipelinesWithProcessors = new ArrayList<>();
9496

9597
for (PipelineConfiguration pipelineConfiguration : pipelines) {
9698
List<AbstractEnrichProcessor> enrichProcessors = ingestService.getProcessorsInPipeline(
9799
pipelineConfiguration.getId(),
98100
AbstractEnrichProcessor.class
99101
);
100102
for (AbstractEnrichProcessor processor : enrichProcessors) {
101-
if (processor.getPolicyName().equals(request.getName())) {
103+
if (processor.getPolicyName().equals(policyName)) {
102104
pipelinesWithProcessors.add(pipelineConfiguration.getId());
103105
}
104106
}
@@ -108,26 +110,30 @@ protected void masterOperation(
108110
throw new ElasticsearchStatusException(
109111
"Could not delete policy [{}] because a pipeline is referencing it {}",
110112
RestStatus.CONFLICT,
111-
request.getName(),
113+
policyName,
112114
pipelinesWithProcessors
113115
);
114116
}
115117
} catch (Exception e) {
116-
enrichPolicyLocks.releasePolicy(request.getName());
118+
enrichPolicyLocks.releasePolicy(policyName);
117119
listener.onFailure(e);
118120
return;
119121
}
120122

121-
GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(request.getName()) + "-*")
123+
final GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(policyName) + "-*")
122124
.indicesOptions(IndicesOptions.lenientExpand());
123125

124126
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, indices);
125127

126-
deleteIndicesAndPolicy(concreteIndices, request.getName(), ActionListener.wrap((response) -> {
127-
enrichPolicyLocks.releasePolicy(request.getName());
128+
// the wildcard expansion could be too wide (e.g. in the case of a policy named policy-1 and another named policy-10),
129+
// so we need to filter down to just the concrete indices that are actually indices for this policy
130+
concreteIndices = Stream.of(concreteIndices).filter(i -> EnrichPolicy.isPolicyForIndex(policyName, i)).toArray(String[]::new);
131+
132+
deleteIndicesAndPolicy(concreteIndices, policyName, ActionListener.wrap((response) -> {
133+
enrichPolicyLocks.releasePolicy(policyName);
128134
listener.onResponse(response);
129135
}, (exc) -> {
130-
enrichPolicyLocks.releasePolicy(request.getName());
136+
enrichPolicyLocks.releasePolicy(policyName);
131137
listener.onFailure(exc);
132138
}));
133139
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,19 @@ public static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPoli
9797
assertThat(newInstance.getMatchField(), equalTo(expectedInstance.getMatchField()));
9898
assertThat(newInstance.getEnrichFields(), equalTo(expectedInstance.getEnrichFields()));
9999
}
100+
101+
public void testIsPolicyForIndex() {
102+
String policy1 = "policy-1";
103+
String policy2 = "policy-10"; // the first policy is a prefix of the second policy!
104+
105+
String index1 = EnrichPolicy.getIndexName(policy1, 1000);
106+
String index2 = EnrichPolicy.getIndexName(policy2, 2000);
107+
108+
assertTrue(EnrichPolicy.isPolicyForIndex(policy1, index1));
109+
assertTrue(EnrichPolicy.isPolicyForIndex(policy2, index2));
110+
111+
assertFalse(EnrichPolicy.isPolicyForIndex(policy1, index2));
112+
assertFalse(EnrichPolicy.isPolicyForIndex(policy2, index1));
113+
}
114+
100115
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public void cleanupPolicy() {
5353

5454
public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
5555
String fakeId = "fake-id";
56-
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
57-
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
56+
createIndex(EnrichPolicy.getIndexName(fakeId, 1001));
57+
createIndex(EnrichPolicy.getIndexName(fakeId, 1002));
5858

5959
final CountDownLatch latch = new CountDownLatch(1);
6060
final AtomicReference<Exception> reference = new AtomicReference<>();
@@ -128,13 +128,13 @@ public void testDeleteIsNotLocked() throws Exception {
128128
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
129129
}
130130

131-
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
132-
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
131+
createIndex(EnrichPolicy.getIndexName(name, 1001));
132+
createIndex(EnrichPolicy.getIndexName(name, 1002));
133133

134134
client().admin()
135135
.indices()
136136
.prepareGetIndex()
137-
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
137+
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1002))
138138
.get();
139139

140140
final CountDownLatch latch = new CountDownLatch(1);
@@ -160,7 +160,7 @@ public void onFailure(final Exception e) {
160160
() -> client().admin()
161161
.indices()
162162
.prepareGetIndex()
163-
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
163+
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1001))
164164
.get()
165165
);
166166

@@ -183,8 +183,8 @@ public void testDeleteLocked() throws InterruptedException {
183183
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
184184
assertThat(error.get(), nullValue());
185185

186-
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
187-
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
186+
createIndex(EnrichPolicy.getIndexName(name, 1001));
187+
createIndex(EnrichPolicy.getIndexName(name, 1002));
188188

189189
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
190190
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
@@ -241,4 +241,50 @@ public void onFailure(final Exception e) {
241241
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
242242
}
243243
}
244+
245+
public void testDeletePolicyPrefixes() throws InterruptedException {
246+
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
247+
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
248+
249+
String name = "my-policy";
250+
String otherName = "my-policy-two"; // the first policy is a prefix of this one
251+
252+
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
253+
AtomicReference<Exception> error;
254+
error = saveEnrichPolicy(name, policy, clusterService);
255+
assertThat(error.get(), nullValue());
256+
error = saveEnrichPolicy(otherName, policy, clusterService);
257+
assertThat(error.get(), nullValue());
258+
259+
// create an index for the *other* policy
260+
createIndex(EnrichPolicy.getIndexName(otherName, 1001));
261+
262+
{
263+
final CountDownLatch latch = new CountDownLatch(1);
264+
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
265+
266+
ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
267+
@Override
268+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
269+
reference.set(acknowledgedResponse);
270+
latch.countDown();
271+
}
272+
273+
public void onFailure(final Exception e) {
274+
fail();
275+
}
276+
});
277+
latch.await();
278+
assertNotNull(reference.get());
279+
assertTrue(reference.get().isAcknowledged());
280+
281+
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
282+
283+
// deleting name policy should have no effect on the other policy
284+
assertNotNull(EnrichStore.getPolicy(otherName, clusterService.state()));
285+
286+
// and the index associated with the other index should be unaffected
287+
client().admin().indices().prepareGetIndex().setIndices(EnrichPolicy.getIndexName(otherName, 1001)).get();
288+
}
289+
}
244290
}

0 commit comments

Comments
 (0)