|
18 | 18 | import org.elasticsearch.common.io.stream.StreamOutput;
|
19 | 19 | import org.elasticsearch.core.Nullable;
|
20 | 20 | import org.elasticsearch.core.TimeValue;
|
| 21 | +import org.elasticsearch.core.Tuple; |
21 | 22 | import org.elasticsearch.index.Index;
|
22 | 23 | import org.elasticsearch.license.XPackLicenseState;
|
23 | 24 | import org.elasticsearch.xcontent.DeprecationHandler;
|
|
48 | 49 | import java.util.Set;
|
49 | 50 | import java.util.SortedMap;
|
50 | 51 | import java.util.TreeMap;
|
| 52 | +import java.util.concurrent.ConcurrentHashMap; |
51 | 53 | import java.util.stream.Collectors;
|
52 | 54 |
|
53 | 55 | public class PolicyStepsRegistry {
|
54 | 56 | private static final Logger logger = LogManager.getLogger(PolicyStepsRegistry.class);
|
55 | 57 |
|
| 58 | + private final NamedXContentRegistry xContentRegistry; |
56 | 59 | private final Client client;
|
57 | 60 | private final XPackLicenseState licenseState;
|
| 61 | + |
58 | 62 | // keeps track of existing policies in the cluster state
|
59 | 63 | private final SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap;
|
60 | 64 | // keeps track of what the first step in a policy is, the key is policy name
|
61 | 65 | private final Map<String, Step> firstStepMap;
|
62 | 66 | // keeps track of a mapping from policy/step-name to respective Step, the key is policy name
|
63 | 67 | private final Map<String, Map<Step.StepKey, Step>> stepMap;
|
64 |
| - private final NamedXContentRegistry xContentRegistry; |
| 68 | + |
| 69 | + // tracks an index->step cache, where the indexmetadata is also tracked for cache invalidation/eviction purposes. |
| 70 | + // for a given index, the step can be cached as long as the indexmetadata (and the policy!) hasn't changed. since |
| 71 | + // policies change infrequently, the entire cache is cleared on policy change. |
| 72 | + private final Map<Index, Tuple<IndexMetadata, Step>> cachedSteps = new ConcurrentHashMap<>(); |
65 | 73 |
|
66 | 74 | public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client, XPackLicenseState licenseState) {
|
67 | 75 | this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client, licenseState);
|
@@ -98,6 +106,9 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
|
98 | 106 | public void update(IndexLifecycleMetadata meta) {
|
99 | 107 | assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";
|
100 | 108 |
|
| 109 | + // since the policies (may have) changed, the whole steps cache needs to be thrown out |
| 110 | + cachedSteps.clear(); |
| 111 | + |
101 | 112 | DiffableUtils.MapDiff<String, LifecyclePolicyMetadata, Map<String, LifecyclePolicyMetadata>> mapDiff = DiffableUtils.diff(
|
102 | 113 | lifecyclePolicyMap,
|
103 | 114 | meta.getPolicyMetadatas(),
|
@@ -154,6 +165,36 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
|
154 | 165 | }
|
155 | 166 | }
|
156 | 167 |
|
| 168 | + /** |
| 169 | + * Remove the entry for an index from the index->step cache. |
| 170 | + * |
| 171 | + * We clear the map entirely when the master of the cluster changes, and when any |
| 172 | + * policy changes, but in a long-lived cluster that doesn't happen to experience |
| 173 | + * either of those events (and where indices are removed regularly) we still want |
| 174 | + * the cache to trim deleted indices. |
| 175 | + * |
| 176 | + * n.b. even with this, there's still a pretty small chance that a given index |
| 177 | + * could leak, if we're right in the middle of populating the cache for that |
| 178 | + * index (in getStep) when we process the delete here, then we'll end up with an |
| 179 | + * entry that doesn't get deleted until the master changes or a policy changes |
| 180 | + * -- it's harmless enough |
| 181 | + */ |
| 182 | + public void delete(Index deleted) { |
| 183 | + cachedSteps.remove(deleted); |
| 184 | + } |
| 185 | + |
| 186 | + /** |
| 187 | + * Clear internal maps that were populated by update (and others). |
| 188 | + */ |
| 189 | + public void clear() { |
| 190 | + // this is potentially large, so it's important to clear it |
| 191 | + cachedSteps.clear(); |
| 192 | + // these are relatively small, but there's no harm in clearing them |
| 193 | + lifecyclePolicyMap.clear(); |
| 194 | + firstStepMap.clear(); |
| 195 | + stepMap.clear(); |
| 196 | + } |
| 197 | + |
157 | 198 | /**
|
158 | 199 | * Return all ordered steps for the current policy for the index. Does not
|
159 | 200 | * resolve steps using the phase caching, but only for the currently existing policy.
|
@@ -266,6 +307,14 @@ private List<Step> parseStepsFromPhase(String policy, String currentPhase, Strin
|
266 | 307 |
|
267 | 308 | @Nullable
|
268 | 309 | public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKey) {
|
| 310 | + final Tuple<IndexMetadata, Step> cachedStep = cachedSteps.get(indexMetadata.getIndex()); |
| 311 | + // n.b. we're using instance equality here for the IndexMetadata rather than object equality because it's fast, |
| 312 | + // this means that we're erring on the side of cache misses (if the IndexMetadata changed in any way, it'll be |
| 313 | + // a new instance, so we'll miss-and-repopulate the cache for the index in question) |
| 314 | + if (cachedStep != null && cachedStep.v1() == indexMetadata && cachedStep.v2().getKey().equals(stepKey)) { |
| 315 | + return cachedStep.v2(); |
| 316 | + } |
| 317 | + |
269 | 318 | if (ErrorStep.NAME.equals(stepKey.getName())) {
|
270 | 319 | return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
|
271 | 320 | }
|
@@ -304,7 +353,9 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
|
304 | 353 | + phaseSteps;
|
305 | 354 |
|
306 | 355 | // Return the step that matches the given stepKey or else null if we couldn't find it
|
307 |
| - return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); |
| 356 | + final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); |
| 357 | + cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s)); |
| 358 | + return s; |
308 | 359 | }
|
309 | 360 |
|
310 | 361 | /**
|
|
0 commit comments