8
8
9
9
import org .apache .logging .log4j .LogManager ;
10
10
import org .apache .logging .log4j .Logger ;
11
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
11
12
import org .elasticsearch .action .ActionListener ;
12
13
import org .elasticsearch .action .support .ActionFilters ;
13
14
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
15
+ import org .elasticsearch .client .Client ;
14
16
import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
15
17
import org .elasticsearch .cluster .ClusterState ;
16
18
import org .elasticsearch .cluster .block .ClusterBlockException ;
17
19
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
20
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
18
21
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
19
22
import org .elasticsearch .cluster .metadata .MetaData ;
20
23
import org .elasticsearch .cluster .service .ClusterService ;
24
+ import org .elasticsearch .common .Nullable ;
25
+ import org .elasticsearch .common .Strings ;
21
26
import org .elasticsearch .common .inject .Inject ;
22
27
import org .elasticsearch .common .io .stream .StreamInput ;
28
+ import org .elasticsearch .common .xcontent .DeprecationHandler ;
29
+ import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
30
+ import org .elasticsearch .common .xcontent .XContentParser ;
31
+ import org .elasticsearch .common .xcontent .json .JsonXContent ;
23
32
import org .elasticsearch .threadpool .ThreadPool ;
24
33
import org .elasticsearch .transport .TransportService ;
25
34
import org .elasticsearch .xpack .core .ClientHelper ;
35
+ import org .elasticsearch .xpack .core .ilm .ErrorStep ;
26
36
import org .elasticsearch .xpack .core .ilm .IndexLifecycleMetadata ;
37
+ import org .elasticsearch .xpack .core .ilm .LifecycleExecutionState ;
27
38
import org .elasticsearch .xpack .core .ilm .LifecyclePolicy ;
28
39
import org .elasticsearch .xpack .core .ilm .LifecyclePolicyMetadata ;
40
+ import org .elasticsearch .xpack .core .ilm .LifecycleSettings ;
41
+ import org .elasticsearch .xpack .core .ilm .PhaseExecutionInfo ;
42
+ import org .elasticsearch .xpack .core .ilm .Step ;
29
43
import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction ;
30
44
import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction .Request ;
31
45
import org .elasticsearch .xpack .core .ilm .action .PutLifecycleAction .Response ;
46
+ import org .elasticsearch .xpack .ilm .IndexLifecycleTransition ;
32
47
33
48
import java .io .IOException ;
34
49
import java .time .Instant ;
50
+ import java .util .LinkedHashSet ;
51
+ import java .util .List ;
35
52
import java .util .Map ;
53
+ import java .util .Set ;
36
54
import java .util .SortedMap ;
55
+ import java .util .Spliterators ;
37
56
import java .util .TreeMap ;
38
57
import java .util .stream .Collectors ;
58
+ import java .util .stream .StreamSupport ;
39
59
40
60
/**
41
61
* This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well
44
64
public class TransportPutLifecycleAction extends TransportMasterNodeAction <Request , Response > {
45
65
46
66
private static final Logger logger = LogManager .getLogger (TransportPutLifecycleAction .class );
67
+ private final NamedXContentRegistry xContentRegistry ;
68
+ private final Client client ;
47
69
48
70
@ Inject
49
71
public TransportPutLifecycleAction (TransportService transportService , ClusterService clusterService , ThreadPool threadPool ,
50
- ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ) {
72
+ ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ,
73
+ NamedXContentRegistry namedXContentRegistry , Client client ) {
51
74
super (PutLifecycleAction .NAME , transportService , clusterService , threadPool , actionFilters , Request ::new ,
52
75
indexNameExpressionResolver );
76
+ this .xContentRegistry = namedXContentRegistry ;
77
+ this .client = client ;
53
78
}
54
79
55
80
@ Override
@@ -81,7 +106,7 @@ protected Response newResponse(boolean acknowledged) {
81
106
82
107
@ Override
83
108
public ClusterState execute (ClusterState currentState ) throws Exception {
84
- ClusterState .Builder newState = ClusterState .builder (currentState );
109
+ ClusterState .Builder stateBuilder = ClusterState .builder (currentState );
85
110
IndexLifecycleMetadata currentMetadata = currentState .metaData ().custom (IndexLifecycleMetadata .TYPE );
86
111
if (currentMetadata == null ) { // first time using index-lifecycle feature, bootstrap metadata
87
112
currentMetadata = IndexLifecycleMetadata .EMPTY ;
@@ -99,13 +124,195 @@ public ClusterState execute(ClusterState currentState) throws Exception {
99
124
logger .info ("updating index lifecycle policy [{}]" , request .getPolicy ().getName ());
100
125
}
101
126
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata (newPolicies , currentMetadata .getOperationMode ());
102
- newState .metaData (MetaData .builder (currentState .getMetaData ())
127
+ stateBuilder .metaData (MetaData .builder (currentState .getMetaData ())
103
128
.putCustom (IndexLifecycleMetadata .TYPE , newMetadata ).build ());
104
- return newState .build ();
129
+ ClusterState nonRefreshedState = stateBuilder .build ();
130
+ if (oldPolicy == null ) {
131
+ return nonRefreshedState ;
132
+ } else {
133
+ try {
134
+ return updateIndicesForPolicy (nonRefreshedState , xContentRegistry , client ,
135
+ oldPolicy .getPolicy (), lifecyclePolicyMetadata );
136
+ } catch (Exception e ) {
137
+ logger .warn (new ParameterizedMessage ("unable to refresh indices phase JSON for updated policy [{}]" ,
138
+ oldPolicy .getName ()), e );
139
+ // Revert to the non-refreshed state
140
+ return nonRefreshedState ;
141
+ }
142
+ }
105
143
}
106
144
});
107
145
}
108
146
147
+ /**
148
+ * Ensure that we have the minimum amount of metadata necessary to check for cache phase
149
+ * refresh. This includes:
150
+ * - An execution state
151
+ * - Existing phase definition JSON
152
+ * - A current step key
153
+ * - A current phase in the step key
154
+ * - Not currently in the ERROR step
155
+ */
156
+ static boolean eligibleToCheckForRefresh (final IndexMetaData metaData ) {
157
+ LifecycleExecutionState executionState = LifecycleExecutionState .fromIndexMetadata (metaData );
158
+ if (executionState == null || executionState .getPhaseDefinition () == null ) {
159
+ return false ;
160
+ }
161
+
162
+ Step .StepKey currentStepKey = LifecycleExecutionState .getCurrentStepKey (executionState );
163
+ if (currentStepKey == null || currentStepKey .getPhase () == null ) {
164
+ return false ;
165
+ }
166
+
167
+ return ErrorStep .NAME .equals (currentStepKey .getName ()) == false ;
168
+ }
169
+
170
+ /**
171
+ * Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
172
+ * If there is an error parsing or if the phase definition is missing the required
173
+ * information, returns null.
174
+ */
175
+ @ Nullable
176
+ static Set <Step .StepKey > readStepKeys (final NamedXContentRegistry xContentRegistry , final Client client ,
177
+ final String phaseDef , final String currentPhase ) {
178
+ final PhaseExecutionInfo phaseExecutionInfo ;
179
+ try (XContentParser parser = JsonXContent .jsonXContent .createParser (xContentRegistry ,
180
+ DeprecationHandler .THROW_UNSUPPORTED_OPERATION , phaseDef )) {
181
+ phaseExecutionInfo = PhaseExecutionInfo .parse (parser , currentPhase );
182
+ } catch (Exception e ) {
183
+ logger .trace (new ParameterizedMessage ("exception reading step keys checking for refreshability, phase definition: {}" ,
184
+ phaseDef ), e );
185
+ return null ;
186
+ }
187
+
188
+ if (phaseExecutionInfo == null || phaseExecutionInfo .getPhase () == null ) {
189
+ return null ;
190
+ }
191
+
192
+ return phaseExecutionInfo .getPhase ().getActions ().values ().stream ()
193
+ .flatMap (a -> a .toSteps (client , phaseExecutionInfo .getPhase ().getName (), null ).stream ())
194
+ .map (Step ::getKey )
195
+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
196
+ }
197
+
198
+ /**
199
+ * Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
200
+ */
201
+ static boolean isIndexPhaseDefinitionUpdatable (final NamedXContentRegistry xContentRegistry , final Client client ,
202
+ final IndexMetaData metaData , final LifecyclePolicy newPolicy ) {
203
+ final String index = metaData .getIndex ().getName ();
204
+ if (eligibleToCheckForRefresh (metaData ) == false ) {
205
+ logger .debug ("[{}] does not contain enough information to check for eligibility of refreshing phase" , index );
206
+ return false ;
207
+ }
208
+ final String policyId = newPolicy .getName ();
209
+
210
+ final LifecycleExecutionState executionState = LifecycleExecutionState .fromIndexMetadata (metaData );
211
+ final Step .StepKey currentStepKey = LifecycleExecutionState .getCurrentStepKey (executionState );
212
+ final String currentPhase = currentStepKey .getPhase ();
213
+
214
+ final Set <Step .StepKey > newStepKeys = newPolicy .toSteps (client ).stream ()
215
+ .map (Step ::getKey )
216
+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
217
+
218
+ if (newStepKeys .contains (currentStepKey ) == false ) {
219
+ // The index is on a step that doesn't exist in the new policy, we
220
+ // can't safely re-read the JSON
221
+ logger .debug ("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed" ,
222
+ index , policyId , currentStepKey );
223
+ return false ;
224
+ }
225
+
226
+ final String phaseDef = executionState .getPhaseDefinition ();
227
+ final Set <Step .StepKey > oldStepKeys = readStepKeys (xContentRegistry , client , phaseDef , currentPhase );
228
+ if (oldStepKeys == null ) {
229
+ logger .debug ("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed" ,
230
+ index , policyId );
231
+ return false ;
232
+ }
233
+
234
+ final Set <Step .StepKey > oldPhaseStepKeys = oldStepKeys .stream ()
235
+ .filter (sk -> currentPhase .equals (sk .getPhase ()))
236
+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
237
+
238
+ final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo (policyId , newPolicy .getPhases ().get (currentPhase ), 1L , 1L );
239
+ final String peiJson = Strings .toString (phaseExecutionInfo );
240
+
241
+ final Set <Step .StepKey > newPhaseStepKeys = readStepKeys (xContentRegistry , client , peiJson , currentPhase );
242
+ if (newPhaseStepKeys == null ) {
243
+ logger .debug (new ParameterizedMessage ("[{}] unable to parse phase definition for policy [{}] " +
244
+ "to determine if it could be refreshed" , index , policyId ));
245
+ return false ;
246
+ }
247
+
248
+ if (newPhaseStepKeys .equals (oldPhaseStepKeys )) {
249
+ // The new and old phase have the same stepkeys for this current phase, so we can
250
+ // refresh the definition because we know it won't change the execution flow.
251
+ logger .debug ("[{}] updated policy [{}] contains the same phase step keys and can be refreshed" , index , policyId );
252
+ return true ;
253
+ } else {
254
+ logger .debug ("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
255
+ "definition as it differs too greatly. old: {}, new: {}" ,
256
+ index , policyId , oldPhaseStepKeys , newPhaseStepKeys );
257
+ return false ;
258
+ }
259
+ }
260
+
261
+ /**
262
+ * Rereads the phase JSON for the given index, returning a new cluster state.
263
+ */
264
+ static ClusterState refreshPhaseDefinition (final ClusterState state , final String index , final LifecyclePolicyMetadata updatedPolicy ) {
265
+ final IndexMetaData idxMeta = state .metaData ().index (index );
266
+ assert eligibleToCheckForRefresh (idxMeta ) : "index " + index + " is missing crucial information needed to refresh phase definition" ;
267
+
268
+ logger .trace ("[{}] updating cached phase definition for policy [{}]" , index , updatedPolicy .getName ());
269
+ LifecycleExecutionState currentExState = LifecycleExecutionState .fromIndexMetadata (idxMeta );
270
+
271
+ String currentPhase = currentExState .getPhase ();
272
+ PhaseExecutionInfo pei = new PhaseExecutionInfo (updatedPolicy .getName (),
273
+ updatedPolicy .getPolicy ().getPhases ().get (currentPhase ), updatedPolicy .getVersion (), updatedPolicy .getModifiedDate ());
274
+
275
+ LifecycleExecutionState newExState = LifecycleExecutionState .builder (currentExState )
276
+ .setPhaseDefinition (Strings .toString (pei , false , false ))
277
+ .build ();
278
+
279
+ return IndexLifecycleTransition .newClusterStateWithLifecycleState (idxMeta .getIndex (), state , newExState ).build ();
280
+ }
281
+
282
+ /**
283
+ * For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
284
+ */
285
+ static ClusterState updateIndicesForPolicy (final ClusterState state , final NamedXContentRegistry xContentRegistry , final Client client ,
286
+ final LifecyclePolicy oldPolicy , final LifecyclePolicyMetadata newPolicy ) {
287
+ assert oldPolicy .getName ().equals (newPolicy .getName ()) : "expected both policies to have the same id but they were: [" +
288
+ oldPolicy .getName () + "] vs. [" + newPolicy .getName () + "]" ;
289
+
290
+ // No need to update anything if the policies are identical in contents
291
+ if (oldPolicy .equals (newPolicy .getPolicy ())) {
292
+ logger .debug ("policy [{}] is unchanged and no phase definition refresh is needed" , oldPolicy .getName ());
293
+ return state ;
294
+ }
295
+
296
+ final List <String > indicesThatCanBeUpdated =
297
+ StreamSupport .stream (Spliterators .spliteratorUnknownSize (state .metaData ().indices ().valuesIt (), 0 ), false )
298
+ .filter (meta -> newPolicy .getName ().equals (LifecycleSettings .LIFECYCLE_NAME_SETTING .get (meta .getSettings ())))
299
+ .filter (meta -> isIndexPhaseDefinitionUpdatable (xContentRegistry , client , meta , newPolicy .getPolicy ()))
300
+ .map (meta -> meta .getIndex ().getName ())
301
+ .collect (Collectors .toList ());
302
+
303
+ ClusterState updatedState = state ;
304
+ for (String index : indicesThatCanBeUpdated ) {
305
+ try {
306
+ updatedState = refreshPhaseDefinition (updatedState , index , newPolicy );
307
+ } catch (Exception e ) {
308
+ logger .warn (new ParameterizedMessage ("[{}] unable to refresh phase definition for updated policy [{}]" ,
309
+ index , newPolicy .getName ()), e );
310
+ }
311
+ }
312
+
313
+ return updatedState ;
314
+ }
315
+
109
316
@ Override
110
317
protected ClusterBlockException checkBlock (Request request , ClusterState state ) {
111
318
return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
0 commit comments