|
18 | 18 | import org.elasticsearch.common.io.stream.StreamOutput;
|
19 | 19 | import org.elasticsearch.core.Nullable;
|
20 | 20 | import org.elasticsearch.core.TimeValue;
|
| 21 | +import org.elasticsearch.core.Tuple; |
21 | 22 | import org.elasticsearch.index.Index;
|
22 | 23 | import org.elasticsearch.license.XPackLicenseState;
|
23 | 24 | import org.elasticsearch.xcontent.DeprecationHandler;
|
|
49 | 50 | import java.util.Set;
|
50 | 51 | import java.util.SortedMap;
|
51 | 52 | import java.util.TreeMap;
|
| 53 | +import java.util.concurrent.ConcurrentHashMap; |
52 | 54 | import java.util.stream.Collectors;
|
53 | 55 |
|
54 | 56 | public class PolicyStepsRegistry {
|
55 | 57 | private static final Logger logger = LogManager.getLogger(PolicyStepsRegistry.class);
|
56 | 58 |
|
| 59 | + private final NamedXContentRegistry xContentRegistry; |
57 | 60 | private final Client client;
|
58 | 61 | private final XPackLicenseState licenseState;
|
| 62 | + |
59 | 63 | // keeps track of existing policies in the cluster state
|
60 | 64 | private final SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap;
|
61 | 65 | // keeps track of what the first step in a policy is, the key is policy name
|
62 | 66 | private final Map<String, Step> firstStepMap;
|
63 | 67 | // keeps track of a mapping from policy/step-name to respective Step, the key is policy name
|
64 | 68 | private final Map<String, Map<Step.StepKey, Step>> stepMap;
|
65 |
| - private final NamedXContentRegistry xContentRegistry; |
| 69 | + |
| 70 | + // tracks an index->step cache, where the indexmetadata is also tracked for cache invalidation/eviction purposes. |
| 71 | + // for a given index, the step can be cached as long as the indexmetadata (and the policy!) hasn't changed. since |
| 72 | + // policies change infrequently, the entire cache is cleared on policy change. |
| 73 | + private final Map<Index, Tuple<IndexMetadata, Step>> cachedSteps = new ConcurrentHashMap<>(); |
66 | 74 |
|
67 | 75 | public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client, XPackLicenseState licenseState) {
|
68 | 76 | this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client, licenseState);
|
@@ -99,6 +107,9 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
|
99 | 107 | public void update(IndexLifecycleMetadata meta) {
|
100 | 108 | assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";
|
101 | 109 |
|
| 110 | + // since the policies (may have) changed, the whole steps cache needs to be thrown out |
| 111 | + cachedSteps.clear(); |
| 112 | + |
102 | 113 | DiffableUtils.MapDiff<String, LifecyclePolicyMetadata, Map<String, LifecyclePolicyMetadata>> mapDiff = DiffableUtils.diff(
|
103 | 114 | lifecyclePolicyMap,
|
104 | 115 | meta.getPolicyMetadatas(),
|
@@ -155,6 +166,36 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
|
155 | 166 | }
|
156 | 167 | }
|
157 | 168 |
|
| 169 | + /** |
| 170 | + * Remove the entry for an index from the index->step cache. |
| 171 | + * |
| 172 | + * We clear the map entirely when the master of the cluster changes, and when any |
| 173 | + * policy changes, but in a long-lived cluster that doesn't happen to experience |
| 174 | + * either of those events (and where indices are removed regularly) we still want |
| 175 | + * the cache to trim deleted indices. |
| 176 | + * |
| 177 | + * n.b. even with this, there's still a pretty small chance that a given index |
| 178 | + * could leak, if we're right in the middle of populating the cache for that |
| 179 | + * index (in getStep) when we process the delete here, then we'll end up with an |
| 180 | + * entry that doesn't get deleted until the master changes or a policy changes |
| 181 | + * -- it's harmless enough |
| 182 | + */ |
| 183 | + public void delete(Index deleted) { |
| 184 | + cachedSteps.remove(deleted); |
| 185 | + } |
| 186 | + |
| 187 | + /** |
| 188 | + * Clear internal maps that were populated by update (and others). |
| 189 | + */ |
| 190 | + public void clear() { |
| 191 | + // this is potentially large, so it's important to clear it |
| 192 | + cachedSteps.clear(); |
| 193 | + // these are relatively small, but there's no harm in clearing them |
| 194 | + lifecyclePolicyMap.clear(); |
| 195 | + firstStepMap.clear(); |
| 196 | + stepMap.clear(); |
| 197 | + } |
| 198 | + |
158 | 199 | /**
|
159 | 200 | * Return all ordered steps for the current policy for the index. Does not
|
160 | 201 | * resolve steps using the phase caching, but only for the currently existing policy.
|
@@ -267,6 +308,14 @@ private List<Step> parseStepsFromPhase(String policy, String currentPhase, Strin
|
267 | 308 |
|
268 | 309 | @Nullable
|
269 | 310 | public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKey) {
|
| 311 | + final Tuple<IndexMetadata, Step> cachedStep = cachedSteps.get(indexMetadata.getIndex()); |
| 312 | + // n.b. we're using instance equality here for the IndexMetadata rather than object equality because it's fast, |
| 313 | + // this means that we're erring on the side of cache misses (if the IndexMetadata changed in any way, it'll be |
| 314 | + // a new instance, so we'll miss-and-repopulate the cache for the index in question) |
| 315 | + if (cachedStep != null && cachedStep.v1() == indexMetadata && cachedStep.v2().getKey().equals(stepKey)) { |
| 316 | + return cachedStep.v2(); |
| 317 | + } |
| 318 | + |
270 | 319 | if (ErrorStep.NAME.equals(stepKey.getName())) {
|
271 | 320 | return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
|
272 | 321 | }
|
@@ -305,7 +354,9 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
|
305 | 354 | + phaseSteps;
|
306 | 355 |
|
307 | 356 | // Return the step that matches the given stepKey or else null if we couldn't find it
|
308 |
| - return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); |
| 357 | + final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); |
| 358 | + cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s)); |
| 359 | + return s; |
309 | 360 | }
|
310 | 361 |
|
311 | 362 | /**
|
|
0 commit comments