13
13
import org .elasticsearch .client .Client ;
14
14
import org .elasticsearch .cluster .ClusterState ;
15
15
import org .elasticsearch .cluster .ClusterStateObserver ;
16
+ import org .elasticsearch .cluster .metadata .IndexAbstraction ;
16
17
import org .elasticsearch .cluster .metadata .IndexMetadata ;
17
18
import org .elasticsearch .common .Strings ;
18
19
@@ -39,38 +40,47 @@ public boolean isRetryable() {
39
40
@ Override
40
41
public void performAction (IndexMetadata indexMetadata , ClusterState currentClusterState ,
41
42
ClusterStateObserver observer , Listener listener ) {
43
+ String indexName = indexMetadata .getIndex ().getName ();
42
44
boolean indexingComplete = LifecycleSettings .LIFECYCLE_INDEXING_COMPLETE_SETTING .get (indexMetadata .getSettings ());
43
45
if (indexingComplete ) {
44
46
logger .trace (indexMetadata .getIndex () + " has lifecycle complete set, skipping " + RolloverStep .NAME );
45
47
listener .onResponse (true );
46
48
return ;
47
49
}
50
+ IndexAbstraction indexAbstraction = currentClusterState .metadata ().getIndicesLookup ().get (indexName );
51
+ assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't" ;
52
+ final String rolloverTarget ;
53
+ if (indexAbstraction .getParentDataStream () != null ) {
54
+ rolloverTarget = indexAbstraction .getParentDataStream ().getName ();
55
+ } else {
56
+ String rolloverAlias = RolloverAction .LIFECYCLE_ROLLOVER_ALIAS_SETTING .get (indexMetadata .getSettings ());
48
57
49
- String rolloverAlias = RolloverAction .LIFECYCLE_ROLLOVER_ALIAS_SETTING .get (indexMetadata .getSettings ());
58
+ if (Strings .isNullOrEmpty (rolloverAlias )) {
59
+ listener .onFailure (new IllegalArgumentException (String .format (Locale .ROOT ,
60
+ "setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group " +
61
+ "of indices being rolled over" , RolloverAction .LIFECYCLE_ROLLOVER_ALIAS , indexName )));
62
+ return ;
63
+ }
50
64
51
- if (Strings . isNullOrEmpty ( rolloverAlias )) {
52
- listener . onFailure ( new IllegalArgumentException ( String . format ( Locale . ROOT ,
53
- "setting [%s] for index [%s] is empty or not defined" , RolloverAction . LIFECYCLE_ROLLOVER_ALIAS ,
54
- indexMetadata . getIndex (). getName ())) );
55
- return ;
56
- }
65
+ if (indexMetadata . getRolloverInfos (). get ( rolloverAlias ) != null ) {
66
+ logger . info ( "index [{}] was already rolled over for alias [{}], not attempting to roll over again" ,
67
+ indexName , rolloverAlias );
68
+ listener . onResponse ( true );
69
+ return ;
70
+ }
57
71
58
- if (indexMetadata .getRolloverInfos ().get (rolloverAlias ) != null ) {
59
- logger . info ( "index [{}] was already rolled over for alias [{}], not attempting to roll over again" ,
60
- indexMetadata . getIndex (). getName () , rolloverAlias );
61
- listener . onResponse ( true );
62
- return ;
63
- }
72
+ if (indexMetadata .getAliases ().containsKey (rolloverAlias ) == false ) {
73
+ listener . onFailure ( new IllegalArgumentException ( String . format ( Locale . ROOT ,
74
+ "%s [%s] does not point to index [%s]" , RolloverAction . LIFECYCLE_ROLLOVER_ALIAS , rolloverAlias ,
75
+ indexName )) );
76
+ return ;
77
+ }
64
78
65
- if (indexMetadata .getAliases ().containsKey (rolloverAlias ) == false ) {
66
- listener .onFailure (new IllegalArgumentException (String .format (Locale .ROOT ,
67
- "%s [%s] does not point to index [%s]" , RolloverAction .LIFECYCLE_ROLLOVER_ALIAS , rolloverAlias ,
68
- indexMetadata .getIndex ().getName ())));
69
- return ;
79
+ rolloverTarget = rolloverAlias ;
70
80
}
71
81
72
82
// Calling rollover with no conditions will always roll over the index
73
- RolloverRequest rolloverRequest = new RolloverRequest (rolloverAlias , null )
83
+ RolloverRequest rolloverRequest = new RolloverRequest (rolloverTarget , null )
74
84
.masterNodeTimeout (getMasterTimeout (currentClusterState ));
75
85
// We don't wait for active shards when we perform the rollover because the
76
86
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
0 commit comments