|
8 | 8 |
|
9 | 9 | import org.apache.logging.log4j.LogManager;
|
10 | 10 | import org.apache.logging.log4j.Logger;
|
| 11 | +import org.apache.logging.log4j.message.ParameterizedMessage; |
| 12 | +import org.elasticsearch.action.ActionListener; |
11 | 13 | import org.elasticsearch.cluster.ClusterChangedEvent;
|
12 | 14 | import org.elasticsearch.cluster.ClusterState;
|
13 | 15 | import org.elasticsearch.cluster.ClusterStateListener;
|
14 | 16 | import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
15 | 17 | import org.elasticsearch.cluster.metadata.IndexMetadata;
|
| 18 | +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; |
| 19 | +import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; |
16 | 20 | import org.elasticsearch.cluster.node.DiscoveryNode;
|
17 | 21 | import org.elasticsearch.cluster.routing.RoutingNode;
|
18 | 22 | import org.elasticsearch.cluster.routing.ShardRouting;
|
19 | 23 | import org.elasticsearch.cluster.service.ClusterService;
|
20 | 24 | import org.elasticsearch.common.Strings;
|
| 25 | +import org.elasticsearch.common.collect.ImmutableOpenMap; |
21 | 26 | import org.elasticsearch.common.component.LifecycleListener;
|
22 | 27 | import org.elasticsearch.gateway.GatewayService;
|
23 | 28 | import org.elasticsearch.index.shard.ShardId;
|
|
26 | 31 | import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
27 | 32 | import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
|
28 | 33 |
|
| 34 | +import java.util.ArrayList; |
29 | 35 | import java.util.Collections;
|
30 | 36 | import java.util.Comparator;
|
31 | 37 | import java.util.EnumSet;
|
32 | 38 | import java.util.List;
|
33 | 39 | import java.util.Set;
|
| 40 | +import java.util.concurrent.atomic.AtomicBoolean; |
34 | 41 | import java.util.concurrent.atomic.AtomicReference;
|
35 | 42 | import java.util.function.Supplier;
|
36 | 43 | import java.util.stream.Collectors;
|
|
41 | 48 | public class WatcherLifeCycleService implements ClusterStateListener {
|
42 | 49 |
|
43 | 50 | private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);
|
| 51 | + |
| 52 | + public static final Set<String> LEGACY_WATCHER_INDEX_TEMPLATES = Set.of( |
| 53 | + ".watches", |
| 54 | + ".triggered_watches" |
| 55 | + ); |
| 56 | + |
44 | 57 | private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
|
45 | 58 | private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
|
46 | 59 | private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
|
| 60 | + private final ClusterService clusterService; |
47 | 61 | private volatile WatcherService watcherService;
|
48 | 62 | private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING);
|
| 63 | + private final AtomicBoolean legacyTemplatesDeleteInProgress = new AtomicBoolean(false); |
| 64 | + private final AtomicBoolean checkForLegacyTemplates = new AtomicBoolean(true); |
49 | 65 |
|
50 | 66 | WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) {
|
51 | 67 | this.watcherService = watcherService;
|
| 68 | + this.clusterService = clusterService; |
52 | 69 | clusterService.addListener(this);
|
53 | 70 | // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
|
54 | 71 | // happen because we're shutting down and an watch is scheduled.
|
@@ -96,6 +113,36 @@ public void clusterChanged(ClusterChangedEvent event) {
|
96 | 113 | return;
|
97 | 114 | }
|
98 | 115 |
|
| 116 | + if (event.localNodeMaster() && checkForLegacyTemplates.get()) { |
| 117 | + List<String> existingTemplatesToDelete = new ArrayList<>(LEGACY_WATCHER_INDEX_TEMPLATES.size()); |
| 118 | + ImmutableOpenMap<String, IndexTemplateMetadata> clusterLegacyTemplates = event.state().getMetadata().templates(); |
| 119 | + for (String legacyWatcherIndexTemplate : LEGACY_WATCHER_INDEX_TEMPLATES) { |
| 120 | + if (clusterLegacyTemplates.containsKey(legacyWatcherIndexTemplate)) { |
| 121 | + existingTemplatesToDelete.add(legacyWatcherIndexTemplate); |
| 122 | + } |
| 123 | + } |
| 124 | + |
| 125 | + if (existingTemplatesToDelete.isEmpty() == false) { |
| 126 | + // if someone else is executing the deletion of templates (due to fast successive cluster updates), we'll skip doing so |
| 127 | + if (legacyTemplatesDeleteInProgress.compareAndSet(false, true) == false) { |
| 128 | + return; |
| 129 | + } |
| 130 | + |
| 131 | + MetadataIndexTemplateService.removeTemplates(clusterService, LEGACY_WATCHER_INDEX_TEMPLATES, |
| 132 | + ActionListener.wrap(r -> { |
| 133 | + legacyTemplatesDeleteInProgress.set(false); |
| 134 | + // we've done it so we shouldn't check anymore |
| 135 | + checkForLegacyTemplates.set(false); |
| 136 | + logger.debug("deleted legacy Watcher index templates [{}]", String.join(",", LEGACY_WATCHER_INDEX_TEMPLATES)); |
| 137 | + }, e -> { |
| 138 | + legacyTemplatesDeleteInProgress.set(false); |
| 139 | + logger.debug(new ParameterizedMessage("unable to delete legacy Watcher index templates [{}]", |
| 140 | + String.join(",", LEGACY_WATCHER_INDEX_TEMPLATES)), e); |
| 141 | + }) |
| 142 | + ); |
| 143 | + } |
| 144 | + } |
| 145 | + |
99 | 146 | boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
|
100 | 147 | boolean isStoppedOrStopping = stopStates.contains(this.state.get());
|
101 | 148 | // if this is not a data node, we need to start it ourselves possibly
|
|
0 commit comments