Skip to content

Commit 28c529f

Browse files
authored
Enrich store should only update the policies via an update task. (#41944)
1 parent 97d658e commit 28c529f

File tree

2 files changed

+26
-16
lines changed

2 files changed

+26
-16
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.HashMap;
1717
import java.util.Map;
1818
import java.util.function.Consumer;
19+
import java.util.function.Function;
1920

2021
/**
2122
* Helper methods for access and storage of an enrich policy.
@@ -43,9 +44,11 @@ public static void putPolicy(String name, EnrichPolicy policy, ClusterService cl
4344
}
4445
// TODO: add policy validation
4546

46-
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state());
47-
policies.put(name, policy);
48-
updateClusterState(policies, clusterService, handler);
47+
updateClusterState(clusterService, handler, current -> {
48+
final Map<String, EnrichPolicy> policies = getPolicies(current);
49+
policies.put(name, policy);
50+
return policies;
51+
});
4952
}
5053

5154
/**
@@ -62,13 +65,15 @@ public static void deletePolicy(String name, ClusterService clusterService, Cons
6265
throw new IllegalArgumentException("name is missing or empty");
6366
}
6467

65-
final Map<String, EnrichPolicy> policies = getPolicies(clusterService.state());
66-
if (policies.containsKey(name) == false) {
67-
throw new ResourceNotFoundException("policy [{}] not found", name);
68-
}
68+
updateClusterState(clusterService, handler, current -> {
69+
final Map<String, EnrichPolicy> policies = getPolicies(current);
70+
if (policies.containsKey(name) == false) {
71+
throw new ResourceNotFoundException("policy [{}] not found", name);
72+
}
6973

70-
policies.remove(name);
71-
updateClusterState(policies, clusterService, handler);
74+
policies.remove(name);
75+
return policies;
76+
});
7277
}
7378

7479
/**
@@ -103,12 +108,14 @@ public static Map<String, EnrichPolicy> getPolicies(ClusterState state) {
103108
return policies;
104109
}
105110

106-
private static void updateClusterState(Map<String, EnrichPolicy> policies, ClusterService clusterService,
107-
Consumer<Exception> handler) {
108-
clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() {
111+
private static void updateClusterState(ClusterService clusterService,
112+
Consumer<Exception> handler,
113+
Function<ClusterState, Map<String, EnrichPolicy>> function) {
114+
clusterService.submitStateUpdateTask("update-enrich-metadata", new ClusterStateUpdateTask() {
109115

110116
@Override
111117
public ClusterState execute(ClusterState currentState) throws Exception {
118+
Map<String, EnrichPolicy> policies = function.apply(currentState);
112119
MetaData metaData = MetaData.builder(currentState.metaData())
113120
.putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies))
114121
.build();

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ public void testCrud() throws Exception {
3636
assertThat(listPolicies.size(), equalTo(1));
3737
assertThat(listPolicies.get(name), equalTo(policy));
3838

39-
error = deleteEnrichPolicy(name, clusterService);
40-
assertThat(error.get(), nullValue());
39+
deleteEnrichPolicy(name, clusterService);
40+
result = EnrichStore.getPolicy(name, clusterService.state());
41+
assertThat(result, nullValue());
4142
}
4243

4344
public void testPutValidation() throws Exception {
@@ -110,14 +111,16 @@ private AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy po
110111
return error;
111112
}
112113

113-
private AtomicReference<Exception> deleteEnrichPolicy(String name, ClusterService clusterService) throws InterruptedException {
114+
private void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
114115
CountDownLatch latch = new CountDownLatch(1);
115116
AtomicReference<Exception> error = new AtomicReference<>();
116117
EnrichStore.deletePolicy(name, clusterService, e -> {
117118
error.set(e);
118119
latch.countDown();
119120
});
120121
latch.await();
121-
return error;
122+
if (error.get() != null){
123+
throw error.get();
124+
}
122125
}
123126
}

0 commit comments

Comments
 (0)