20
20
package org .elasticsearch .action .admin .indices .rollover ;
21
21
22
22
import org .elasticsearch .action .ActionListener ;
23
- import org .elasticsearch .action .admin .indices .alias .IndicesAliasesClusterStateUpdateRequest ;
24
23
import org .elasticsearch .action .admin .indices .create .CreateIndexClusterStateUpdateRequest ;
25
24
import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
26
25
import org .elasticsearch .action .admin .indices .stats .IndicesStatsAction ;
@@ -150,56 +149,44 @@ public void onResponse(IndicesStatsResponse statsResponse) {
150
149
new RolloverResponse (sourceIndexName , rolloverIndexName , conditionResults , true , false , false , false ));
151
150
return ;
152
151
}
153
- List <Condition <?>> metConditions = rolloverRequest .getConditions ().values ().stream ()
152
+ List <Condition <?>> metConditions = rolloverRequest .getConditions ().values ().stream ()
154
153
.filter (condition -> conditionResults .get (condition .toString ())).collect (Collectors .toList ());
155
154
if (conditionResults .size () == 0 || metConditions .size () > 0 ) {
156
- CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest (unresolvedName , rolloverIndexName ,
157
- rolloverRequest );
158
- createIndexService .createIndex (updateRequest , ActionListener .wrap (createIndexClusterStateUpdateResponse -> {
159
- final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest ;
160
- if (explicitWriteIndex ) {
161
- aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest (sourceIndexName ,
162
- rolloverIndexName , rolloverRequest );
163
- } else {
164
- aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest (sourceIndexName ,
165
- rolloverIndexName , rolloverRequest );
155
+ CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest (unresolvedName ,
156
+ rolloverIndexName , rolloverRequest );
157
+ clusterService .submitStateUpdateTask ("rollover_index source [" + sourceIndexName + "] to target ["
158
+ + rolloverIndexName + "]" , new ClusterStateUpdateTask () {
159
+ @ Override
160
+ public ClusterState execute (ClusterState currentState ) throws Exception {
161
+ ClusterState newState = createIndexService .applyCreateIndexRequest (currentState , createIndexRequest );
162
+ newState = indexAliasesService .applyAliasActions (newState ,
163
+ rolloverAliasToNewIndex (sourceIndexName , rolloverIndexName , rolloverRequest , explicitWriteIndex ));
164
+ RolloverInfo rolloverInfo = new RolloverInfo (rolloverRequest .getAlias (), metConditions ,
165
+ threadPool .absoluteTimeInMillis ());
166
+ return ClusterState .builder (newState )
167
+ .metaData (MetaData .builder (newState .metaData ())
168
+ .put (IndexMetaData .builder (newState .metaData ().index (sourceIndexName ))
169
+ .putRolloverInfo (rolloverInfo ))).build ();
166
170
}
167
- indexAliasesService .indicesAliases (aliasesUpdateRequest ,
168
- ActionListener .wrap (aliasClusterStateUpdateResponse -> {
169
- if (aliasClusterStateUpdateResponse .isAcknowledged ()) {
170
- clusterService .submitStateUpdateTask ("update_rollover_info" , new ClusterStateUpdateTask () {
171
- @ Override
172
- public ClusterState execute (ClusterState currentState ) {
173
- RolloverInfo rolloverInfo = new RolloverInfo (rolloverRequest .getAlias (), metConditions ,
174
- threadPool .absoluteTimeInMillis ());
175
- return ClusterState .builder (currentState )
176
- .metaData (MetaData .builder (currentState .metaData ())
177
- .put (IndexMetaData .builder (currentState .metaData ().index (sourceIndexName ))
178
- .putRolloverInfo (rolloverInfo ))).build ();
179
- }
180
171
181
- @ Override
182
- public void onFailure (String source , Exception e ) {
183
- listener .onFailure (e );
184
- }
172
+ @ Override
173
+ public void onFailure (String source , Exception e ) {
174
+ listener .onFailure (e );
175
+ }
185
176
186
- @ Override
187
- public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
188
- activeShardsObserver .waitForActiveShards (new String []{rolloverIndexName },
189
- rolloverRequest .getCreateIndexRequest ().waitForActiveShards (),
190
- rolloverRequest .masterNodeTimeout (),
191
- isShardsAcknowledged -> listener .onResponse (new RolloverResponse (
192
- sourceIndexName , rolloverIndexName , conditionResults , false , true , true ,
193
- isShardsAcknowledged )),
194
- listener ::onFailure );
195
- }
196
- });
197
- } else {
198
- listener .onResponse (new RolloverResponse (sourceIndexName , rolloverIndexName , conditionResults ,
199
- false , true , false , false ));
200
- }
201
- }, listener ::onFailure ));
202
- }, listener ::onFailure ));
177
+ @ Override
178
+ public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
179
+ if (newState .equals (oldState ) == false ) {
180
+ activeShardsObserver .waitForActiveShards (new String []{rolloverIndexName },
181
+ rolloverRequest .getCreateIndexRequest ().waitForActiveShards (),
182
+ rolloverRequest .masterNodeTimeout (),
183
+ isShardsAcknowledged -> listener .onResponse (new RolloverResponse (
184
+ sourceIndexName , rolloverIndexName , conditionResults , false , true , true ,
185
+ isShardsAcknowledged )),
186
+ listener ::onFailure );
187
+ }
188
+ }
189
+ });
203
190
} else {
204
191
// conditions not met
205
192
listener .onResponse (
@@ -216,29 +203,24 @@ public void onFailure(Exception e) {
216
203
);
217
204
}
218
205
219
- static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest (String oldIndex , String newIndex ,
220
- RolloverRequest request ) {
221
- List <AliasAction > actions = unmodifiableList (Arrays .asList (
222
- new AliasAction .Add (newIndex , request .getAlias (), null , null , null , null ),
223
- new AliasAction .Remove (oldIndex , request .getAlias ())));
224
- final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest (actions )
225
- .ackTimeout (request .ackTimeout ())
226
- .masterNodeTimeout (request .masterNodeTimeout ());
227
- return updateRequest ;
228
- }
229
-
230
- static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest (String oldIndex , String newIndex ,
231
- RolloverRequest request ) {
232
- List <AliasAction > actions = unmodifiableList (Arrays .asList (
233
- new AliasAction .Add (newIndex , request .getAlias (), null , null , null , true ),
234
- new AliasAction .Add (oldIndex , request .getAlias (), null , null , null , false )));
235
- final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest (actions )
236
- .ackTimeout (request .ackTimeout ())
237
- .masterNodeTimeout (request .masterNodeTimeout ());
238
- return updateRequest ;
206
+ /**
207
+ * Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
208
+ * alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
209
+ * in which case, after the rollover, the new index will need to be the explicit write index.
210
+ */
211
+ static List <AliasAction > rolloverAliasToNewIndex (String oldIndex , String newIndex , RolloverRequest request ,
212
+ boolean explicitWriteIndex ) {
213
+ if (explicitWriteIndex ) {
214
+ return unmodifiableList (Arrays .asList (
215
+ new AliasAction .Add (newIndex , request .getAlias (), null , null , null , true ),
216
+ new AliasAction .Add (oldIndex , request .getAlias (), null , null , null , false )));
217
+ } else {
218
+ return unmodifiableList (Arrays .asList (
219
+ new AliasAction .Add (newIndex , request .getAlias (), null , null , null , null ),
220
+ new AliasAction .Remove (oldIndex , request .getAlias ())));
221
+ }
239
222
}
240
223
241
-
242
224
static String generateRolloverIndexName (String sourceIndexName , IndexNameExpressionResolver indexNameExpressionResolver ) {
243
225
String resolvedName = indexNameExpressionResolver .resolveDateMathExpression (sourceIndexName );
244
226
final boolean isDateMath = sourceIndexName .equals (resolvedName ) == false ;
0 commit comments