Skip to content

Commit 9636493

Browse files
committed
Add Rollup ILM Action (elastic#65633)
this commit introduces a new Rollup ILM Action that allows indices to be rolled up according to a specific rollup config. The action also allows for the new rolled up index to be associated with a different policy than the original/source index. Relates elastic#42720. Closes elastic#48003.
1 parent f02300f commit 9636493

21 files changed

+879
-42
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
[role="xpack"]
2+
[[ilm-rollup]]
3+
=== Rollup
4+
5+
Phases allowed: hot, cold.
6+
7+
Aggregates an index's time series data and stores the results in a new read-only
8+
index. For example, you can roll up hourly data into daily or weekly summaries.
9+
10+
For more information about rollup, see the <<rollup-api, rollup action documentation>>
11+
12+
The name of the rolled up index will be the original index name of the managed index prefixed
13+
with `rollup-`.
14+
15+
[[ilm-rollup-options]]
16+
==== Rollup options
17+
`config`::
18+
(Required, integer)
19+
The rollup configuration, a more detailed description of the
20+
rollup configuration specification can be found <<rollup-api-request-body,here>>.
21+
22+
`rollup_policy`::
23+
(Optional, string)
24+
The name of an <<index-lifecycle-management, {ilm}>> ({ilm-init}) policy to associate
25+
with the newly created rollup index.
26+
27+
[[ilm-rollup-ex]]
28+
==== Example
29+
30+
[source,console]
31+
--------------------------------------------------
32+
PUT _ilm/policy/my_policy
33+
{
34+
"policy": {
35+
"phases": {
36+
"cold": {
37+
"actions": {
38+
"rollup" : {
39+
"config": {
40+
"groups": {
41+
"date_histogram": {
42+
"field": "@timestamp",
43+
"calendar_interval": "1y"
44+
}
45+
},
46+
"metrics": [
47+
{ "field": "temperature", "metrics": [ "avg" ] }
48+
]
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
--------------------------------------------------

docs/reference/ilm/ilm-actions.asciidoc

+19-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<<ilm-allocate,Allocate>>::
77
Move shards to nodes with different performance characteristics
8-
and reduce the number of replicas.
8+
and reduce the number of replicas.
99

1010
<<ilm-delete,Delete>>::
1111
Permanently remove the index.
@@ -22,10 +22,10 @@ Move the index shards to the <<data-tiers, data tier>> that corresponds
2222
to the current {ilm-init} phase.
2323

2424
<<ilm-readonly,Read only>>::
25-
Block write operations to the index.
26-
25+
Block write operations to the index.
26+
2727
<<ilm-rollover,Rollover>>::
28-
Remove the index as the write index for the rollover alias and
28+
Remove the index as the write index for the rollover alias and
2929
start indexing to a new index.
3030

3131
<<ilm-searchable-snapshot, Searchable snapshot>>::
@@ -35,17 +35,25 @@ and mount it as a searchable snapshot.
3535

3636
<<ilm-set-priority,Set priority>>::
3737
Lower the priority of an index as it moves through the lifecycle
38-
to ensure that hot indices are recovered first.
38+
to ensure that hot indices are recovered first.
3939

4040
<<ilm-shrink,Shrink>>::
4141
Reduce the number of primary shards by shrinking the index into a new index.
4242

4343
<<ilm-unfollow,Unfollow>>::
4444
Convert a follower index to a regular index.
45-
Performed automatically before a rollover, shrink, or searchable snapshot action.
45+
Performed automatically before a rollover, shrink, or searchable snapshot action.
4646

4747
<<ilm-wait-for-snapshot,Wait for snapshot>>::
48-
Ensure that a snapshot exists before deleting the index.
48+
Ensure that a snapshot exists before deleting the index.
49+
50+
ifdef::permanently-unreleased-branch[]
51+
52+
<<ilm-rollup,Rollup>>::
53+
Aggregates an index's time series data and stores the results in a new read-only
54+
index. For example, you can roll up hourly data into daily or weekly summaries.
55+
56+
endif::[]
4957

5058
include::actions/ilm-allocate.asciidoc[]
5159
include::actions/ilm-delete.asciidoc[]
@@ -59,3 +67,7 @@ include::actions/ilm-set-priority.asciidoc[]
5967
include::actions/ilm-shrink.asciidoc[]
6068
include::actions/ilm-unfollow.asciidoc[]
6169
include::actions/ilm-wait-for-snapshot.asciidoc[]
70+
71+
ifdef::permanently-unreleased-branch[]
72+
include::actions/ilm-rollup.asciidoc[]
73+
endif::[]

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.elasticsearch.xpack.core.ilm.MigrateAction;
7676
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
7777
import org.elasticsearch.xpack.core.ilm.RolloverAction;
78+
import org.elasticsearch.xpack.core.ilm.RollupILMAction;
7879
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
7980
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
8081
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -345,7 +346,7 @@ static Settings additionalSettings(final Settings settings, final boolean enable
345346

346347
@Override
347348
public List<ActionType<? extends ActionResponse>> getClientActions() {
348-
List<ActionType<? extends ActionResponse>> actions = new ArrayList(Arrays.asList(
349+
List<ActionType<? extends ActionResponse>> actions = new ArrayList<>(Arrays.asList(
349350
// deprecation
350351
DeprecationInfoAction.INSTANCE,
351352
// graph
@@ -521,7 +522,7 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
521522

522523
@Override
523524
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
524-
return Stream.concat(
525+
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(Stream.concat(
525526
Arrays.asList(
526527
// graph
527528
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.GRAPH, GraphFeatureSetUsage::new),
@@ -686,7 +687,13 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
686687
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.RUNTIME_FIELDS, RuntimeFieldsFeatureSetUsage::new)
687688
).stream(),
688689
MlEvaluationNamedXContentProvider.getNamedWriteables().stream()
689-
).collect(toList());
690+
).collect(toList()));
691+
692+
if (RollupV2.isEnabled()) {
693+
namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
694+
}
695+
696+
return namedWriteables;
690697
}
691698

692699
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.common.Nullable;
10+
import org.elasticsearch.common.ParseField;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
15+
import org.elasticsearch.common.xcontent.ObjectParser;
16+
import org.elasticsearch.common.xcontent.XContentBuilder;
17+
import org.elasticsearch.common.xcontent.XContentParser;
18+
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
19+
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.Objects;
24+
25+
/**
26+
* A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.rollup.action.RollupAction} on an index
27+
*/
28+
public class RollupILMAction implements LifecycleAction {
29+
public static final String NAME = "rollup";
30+
31+
private static final ParseField CONFIG_FIELD = new ParseField("config");
32+
private static final ParseField POLICY_FIELD = new ParseField("rollup_policy");
33+
34+
@SuppressWarnings("unchecked")
35+
private static final ConstructingObjectParser<RollupILMAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
36+
a -> new RollupILMAction((RollupActionConfig) a[0], (String) a[1]));
37+
38+
private final RollupActionConfig config;
39+
private final String rollupPolicy;
40+
41+
static {
42+
PARSER.declareField(ConstructingObjectParser.constructorArg(),
43+
(p, c) -> RollupActionConfig.fromXContent(p), CONFIG_FIELD, ObjectParser.ValueType.OBJECT);
44+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), POLICY_FIELD);
45+
}
46+
47+
public static RollupILMAction parse(XContentParser parser) {
48+
return PARSER.apply(parser, null);
49+
}
50+
51+
public RollupILMAction(RollupActionConfig config, @Nullable String rollupPolicy) {
52+
this.config = config;
53+
this.rollupPolicy = rollupPolicy;
54+
}
55+
56+
public RollupILMAction(StreamInput in) throws IOException {
57+
this(new RollupActionConfig(in), in.readOptionalString());
58+
}
59+
60+
@Override
61+
public String getWriteableName() {
62+
return NAME;
63+
}
64+
65+
RollupActionConfig config() {
66+
return config;
67+
}
68+
69+
String rollupPolicy() {
70+
return rollupPolicy;
71+
}
72+
73+
@Override
74+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
75+
builder.startObject();
76+
builder.field(CONFIG_FIELD.getPreferredName(), config);
77+
if (rollupPolicy != null) {
78+
builder.field(POLICY_FIELD.getPreferredName(), rollupPolicy);
79+
}
80+
builder.endObject();
81+
return builder;
82+
}
83+
84+
@Override
85+
public void writeTo(StreamOutput out) throws IOException {
86+
config.writeTo(out);
87+
out.writeOptionalString(rollupPolicy);
88+
}
89+
90+
@Override
91+
public boolean isSafeAction() {
92+
return false;
93+
}
94+
95+
@Override
96+
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
97+
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
98+
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME);
99+
StepKey rollupKey = new StepKey(phase, NAME, NAME);
100+
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
101+
readOnlyKey);
102+
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, rollupKey, client);
103+
if (rollupPolicy == null) {
104+
Step rollupStep = new RollupStep(rollupKey, nextStepKey, client, config);
105+
return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep);
106+
} else {
107+
StepKey updateRollupIndexPolicyStepKey = new StepKey(phase, NAME, UpdateRollupIndexPolicyStep.NAME);
108+
Step rollupStep = new RollupStep(rollupKey, updateRollupIndexPolicyStepKey, client, config);
109+
Step updateRollupIndexPolicyStep = new UpdateRollupIndexPolicyStep(updateRollupIndexPolicyStepKey, nextStepKey,
110+
client, rollupPolicy);
111+
return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep, updateRollupIndexPolicyStep);
112+
}
113+
}
114+
115+
@Override
116+
public boolean equals(Object o) {
117+
if (this == o) return true;
118+
if (o == null || getClass() != o.getClass()) return false;
119+
120+
RollupILMAction that = (RollupILMAction) o;
121+
122+
return Objects.equals(this.config, that.config)
123+
&& Objects.equals(this.rollupPolicy, that.rollupPolicy);
124+
}
125+
126+
@Override
127+
public int hashCode() {
128+
return Objects.hash(config, rollupPolicy);
129+
}
130+
131+
@Override
132+
public String toString() {
133+
return Strings.toString(this);
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.cluster.ClusterState;
11+
import org.elasticsearch.cluster.ClusterStateObserver;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
14+
import org.elasticsearch.xpack.core.rollup.action.RollupAction;
15+
16+
import java.util.Objects;
17+
18+
/**
19+
* Rolls up index using a {@link RollupActionConfig}
20+
*/
21+
public class RollupStep extends AsyncActionStep {
22+
public static final String NAME = "rollup";
23+
public static final String ROLLUP_INDEX_NAME_PREFIX = "rollup-";
24+
25+
private final RollupActionConfig config;
26+
27+
public RollupStep(StepKey key, StepKey nextStepKey, Client client, RollupActionConfig config) {
28+
super(key, nextStepKey, client);
29+
this.config = config;
30+
}
31+
32+
public static String getRollupIndexName(String index) {
33+
return ROLLUP_INDEX_NAME_PREFIX + index;
34+
}
35+
36+
@Override
37+
public boolean isRetryable() {
38+
return true;
39+
}
40+
41+
@Override
42+
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
43+
String originalIndex = indexMetadata.getIndex().getName();
44+
RollupAction.Request request = new RollupAction.Request(originalIndex, getRollupIndexName(originalIndex), config);
45+
// currently RollupAction always acknowledges action was complete when no exceptions are thrown.
46+
getClient().execute(RollupAction.INSTANCE, request,
47+
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
48+
}
49+
50+
public RollupActionConfig getConfig() {
51+
return config;
52+
}
53+
54+
@Override
55+
public int hashCode() {
56+
return Objects.hash(super.hashCode(), config);
57+
}
58+
59+
@Override
60+
public boolean equals(Object obj) {
61+
if (obj == null) {
62+
return false;
63+
}
64+
if (getClass() != obj.getClass()) {
65+
return false;
66+
}
67+
RollupStep other = (RollupStep) obj;
68+
return super.equals(obj)
69+
&& Objects.equals(config, other.config);
70+
}
71+
}

0 commit comments

Comments
 (0)