Skip to content

Commit 6ea54e0

Browse files
committed
Merge remote-tracking branch 'elastic/master' into license-to-autoscale
* elastic/master: Autoscaling reactive storage decider (elastic#65520) Fix TranslogTests#testStats (elastic#66227)
2 parents 2daf277 + 5e20c0a commit 6ea54e0

File tree

14 files changed

+1711
-32
lines changed

14 files changed

+1711
-32
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public ReservedSpace getReservedSpace(String nodeId, String dataPath) {
211211
* Method that incorporates the ShardId for the shard into a string that
212212
* includes a 'p' or 'r' depending on whether the shard is a primary.
213213
*/
214-
static String shardIdentifierFromRouting(ShardRouting shardRouting) {
214+
public static String shardIdentifierFromRouting(ShardRouting shardRouting) {
215215
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
216216
}
217217

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,12 +483,13 @@ public void testStats() throws IOException {
483483
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE));
484484
translog.trimUnreferencedReaders();
485485
{
486+
long lastModifiedAge = System.currentTimeMillis() - translog.getCurrent().getLastModifiedTime();
486487
final TranslogStats stats = stats();
487488
assertThat(stats.estimatedNumberOfOperations(), equalTo(0));
488489
assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition));
489490
assertThat(stats.getUncommittedOperations(), equalTo(0));
490491
assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition));
491-
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
492+
assertThat(stats.getEarliestLastModifiedAge(), greaterThanOrEqualTo(lastModifiedAge));
492493
}
493494
}
494495

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.autoscaling.storage;
8+
9+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
11+
import org.elasticsearch.cluster.ClusterInfoService;
12+
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
13+
import org.elasticsearch.cluster.InternalClusterInfoService;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
17+
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
18+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
19+
import org.elasticsearch.common.settings.ClusterSettings;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.elasticsearch.xpack.autoscaling.Autoscaling;
24+
import org.elasticsearch.xpack.autoscaling.LocalStateAutoscaling;
25+
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
26+
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
27+
import org.hamcrest.Matchers;
28+
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.Collection;
32+
import java.util.Locale;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.TreeMap;
36+
import java.util.TreeSet;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.IntStream;
39+
40+
import static org.elasticsearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
41+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
42+
43+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
44+
public class ReactiveStorageIT extends DiskUsageIntegTestCase {
45+
46+
private static final long WATERMARK_BYTES = 10240;
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
51+
plugins.add(LocalStateAutoscaling.class);
52+
return plugins;
53+
}
54+
55+
@Override
56+
protected Settings nodeSettings(final int nodeOrdinal) {
57+
final Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
58+
builder.put(Autoscaling.AUTOSCALING_ENABLED_SETTING.getKey(), true)
59+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), (WATERMARK_BYTES * 2) + "b")
60+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), WATERMARK_BYTES + "b")
61+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
62+
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms")
63+
.put(DiskThresholdDecider.ENABLE_FOR_SINGLE_DATA_NODE.getKey(), "true");
64+
return builder.build();
65+
}
66+
67+
public void testScaleUp() throws InterruptedException {
68+
internalCluster().startMasterOnlyNode();
69+
final String dataNodeName = internalCluster().startDataOnlyNode();
70+
final String policyName = "test";
71+
putAutoscalingPolicy(policyName);
72+
73+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
74+
createIndex(
75+
indexName,
76+
Settings.builder()
77+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
78+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
79+
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
80+
.build()
81+
);
82+
indexRandom(
83+
true,
84+
IntStream.range(1, 100)
85+
.mapToObj(i -> client().prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
86+
.toArray(IndexRequestBuilder[]::new)
87+
);
88+
forceMerge();
89+
refresh();
90+
91+
// just check it does not throw when not refreshed.
92+
capacity();
93+
94+
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().setStore(true).get();
95+
long used = stats.getTotal().getStore().getSizeInBytes();
96+
long minShardSize = Arrays.stream(stats.getShards()).mapToLong(s -> s.getStats().getStore().sizeInBytes()).min().orElseThrow();
97+
long maxShardSize = Arrays.stream(stats.getShards()).mapToLong(s -> s.getStats().getStore().sizeInBytes()).max().orElseThrow();
98+
long enoughSpace = used + WATERMARK_BYTES + 1;
99+
100+
setTotalSpace(dataNodeName, enoughSpace);
101+
GetAutoscalingCapacityAction.Response response = capacity();
102+
assertThat(response.results().keySet(), Matchers.equalTo(Set.of(policyName)));
103+
assertThat(response.results().get(policyName).currentCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
104+
assertThat(response.results().get(policyName).requiredCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace));
105+
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
106+
107+
setTotalSpace(dataNodeName, enoughSpace - 2);
108+
response = capacity();
109+
assertThat(response.results().keySet(), Matchers.equalTo(Set.of(policyName)));
110+
assertThat(response.results().get(policyName).currentCapacity().total().storage().getBytes(), Matchers.equalTo(enoughSpace - 2));
111+
assertThat(
112+
response.results().get(policyName).requiredCapacity().total().storage().getBytes(),
113+
Matchers.greaterThan(enoughSpace - 2)
114+
);
115+
assertThat(
116+
response.results().get(policyName).requiredCapacity().total().storage().getBytes(),
117+
Matchers.lessThanOrEqualTo(enoughSpace + minShardSize)
118+
);
119+
assertThat(response.results().get(policyName).requiredCapacity().node().storage().getBytes(), Matchers.equalTo(maxShardSize));
120+
}
121+
122+
/**
123+
* Verify that the list of roles includes all data roles to ensure we consider adding future data roles.
124+
*/
125+
public void testRoles() {
126+
// this has to be an integration test to ensure roles are available.
127+
internalCluster().startMasterOnlyNode();
128+
ReactiveStorageDeciderService service = new ReactiveStorageDeciderService(
129+
Settings.EMPTY,
130+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
131+
null
132+
);
133+
assertThat(
134+
service.roles().stream().sorted().collect(Collectors.toList()),
135+
Matchers.equalTo(
136+
DiscoveryNode.getPossibleRoles().stream().filter(DiscoveryNodeRole::canContainData).sorted().collect(Collectors.toList())
137+
)
138+
);
139+
}
140+
141+
public void setTotalSpace(String dataNodeName, long totalSpace) {
142+
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
143+
final ClusterInfoService clusterInfoService = internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
144+
((InternalClusterInfoService) clusterInfoService).refresh();
145+
}
146+
147+
public GetAutoscalingCapacityAction.Response capacity() {
148+
GetAutoscalingCapacityAction.Request request = new GetAutoscalingCapacityAction.Request();
149+
GetAutoscalingCapacityAction.Response response = client().execute(GetAutoscalingCapacityAction.INSTANCE, request).actionGet();
150+
return response;
151+
}
152+
153+
private void putAutoscalingPolicy(String policyName) {
154+
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
155+
policyName,
156+
new TreeSet<>(Set.of("data")),
157+
new TreeMap<>(Map.of("reactive_storage", Settings.EMPTY))
158+
);
159+
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());
160+
}
161+
}

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/Autoscaling.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.autoscaling;
88

9+
import org.apache.lucene.util.SetOnce;
910
import org.elasticsearch.Build;
1011
import org.elasticsearch.action.ActionRequest;
1112
import org.elasticsearch.action.ActionResponse;
@@ -14,6 +15,7 @@
1415
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1516
import org.elasticsearch.cluster.metadata.Metadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNodes;
18+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
1719
import org.elasticsearch.cluster.service.ClusterService;
1820
import org.elasticsearch.common.ParseField;
1921
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -50,6 +52,7 @@
5052
import org.elasticsearch.xpack.autoscaling.rest.RestGetAutoscalingCapacityHandler;
5153
import org.elasticsearch.xpack.autoscaling.rest.RestGetAutoscalingPolicyHandler;
5254
import org.elasticsearch.xpack.autoscaling.rest.RestPutAutoscalingPolicyHandler;
55+
import org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageDeciderService;
5356

5457
import java.util.ArrayList;
5558
import java.util.Collection;
@@ -92,6 +95,8 @@ public class Autoscaling extends Plugin implements ActionPlugin, ExtensiblePlugi
9295

9396
private final boolean enabled;
9497
private final List<AutoscalingExtension> autoscalingExtensions;
98+
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
99+
private final SetOnce<AllocationDeciders> allocationDeciders = new SetOnce<>();
95100
private final AutoscalingLicenseChecker autoscalingLicenseChecker;
96101

97102
public Autoscaling(final Settings settings) {
@@ -136,6 +141,7 @@ public Collection<Object> createComponents(
136141
IndexNameExpressionResolver indexNameExpressionResolver,
137142
Supplier<RepositoriesService> repositoriesServiceSupplier
138143
) {
144+
this.clusterService.set(clusterService);
139145
return List.of(new AutoscalingCalculateCapacityService.Holder(this), autoscalingLicenseChecker);
140146
}
141147

@@ -184,6 +190,11 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
184190
AutoscalingDeciderResult.Reason.class,
185191
FixedAutoscalingDeciderService.NAME,
186192
FixedAutoscalingDeciderService.FixedReason::new
193+
),
194+
new NamedWriteableRegistry.Entry(
195+
AutoscalingDeciderResult.Reason.class,
196+
ReactiveStorageDeciderService.NAME,
197+
ReactiveStorageDeciderService.ReactiveReason::new
187198
)
188199
);
189200
}
@@ -202,10 +213,19 @@ public void loadExtensions(ExtensionLoader loader) {
202213

203214
@Override
204215
public Collection<AutoscalingDeciderService> deciders() {
205-
return List.of(new FixedAutoscalingDeciderService());
216+
assert allocationDeciders.get() != null;
217+
return List.of(
218+
new FixedAutoscalingDeciderService(),
219+
new ReactiveStorageDeciderService(
220+
clusterService.get().getSettings(),
221+
clusterService.get().getClusterSettings(),
222+
allocationDeciders.get()
223+
)
224+
);
206225
}
207226

208-
public Set<AutoscalingDeciderService> createDeciderServices() {
227+
public Set<AutoscalingDeciderService> createDeciderServices(AllocationDeciders allocationDeciders) {
228+
this.allocationDeciders.set(allocationDeciders);
209229
return autoscalingExtensions.stream().flatMap(p -> p.deciders().stream()).collect(Collectors.toSet());
210230
}
211231

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/GetAutoscalingCapacityAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public void writeTo(final StreamOutput out) throws IOException {
8383
out.writeMap(results, StreamOutput::writeString, (o, decision) -> decision.writeTo(o));
8484
}
8585

86+
public SortedMap<String, AutoscalingDeciderResults> results() {
87+
return results;
88+
}
89+
8690
@Override
8791
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
8892
builder.startObject();

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.block.ClusterBlockException;
1515
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
16+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.inject.Inject;
1819
import org.elasticsearch.license.LicenseUtils;
20+
import org.elasticsearch.snapshots.SnapshotsInfoService;
1921
import org.elasticsearch.tasks.Task;
2022
import org.elasticsearch.threadpool.ThreadPool;
2123
import org.elasticsearch.transport.TransportService;
@@ -30,6 +32,7 @@ public class TransportGetAutoscalingCapacityAction extends TransportMasterNodeAc
3032

3133
private final AutoscalingCalculateCapacityService capacityService;
3234
private final ClusterInfoService clusterInfoService;
35+
private final SnapshotsInfoService snapshotsInfoService;
3336
private final AutoscalingLicenseChecker autoscalingLicenseChecker;
3437

3538
@Inject
@@ -41,6 +44,8 @@ public TransportGetAutoscalingCapacityAction(
4144
final IndexNameExpressionResolver indexNameExpressionResolver,
4245
final AutoscalingCalculateCapacityService.Holder capacityServiceHolder,
4346
final ClusterInfoService clusterInfoService,
47+
final SnapshotsInfoService snapshotsInfoService,
48+
final AllocationDeciders allocationDeciders,
4449
final AutoscalingLicenseChecker autoscalingLicenseChecker
4550
) {
4651
super(
@@ -54,7 +59,8 @@ public TransportGetAutoscalingCapacityAction(
5459
GetAutoscalingCapacityAction.Response::new,
5560
ThreadPool.Names.SAME
5661
);
57-
this.capacityService = capacityServiceHolder.get();
62+
this.snapshotsInfoService = snapshotsInfoService;
63+
this.capacityService = capacityServiceHolder.get(allocationDeciders);
5864
this.clusterInfoService = clusterInfoService;
5965
this.autoscalingLicenseChecker = Objects.requireNonNull(autoscalingLicenseChecker);
6066
assert this.capacityService != null;
@@ -73,7 +79,9 @@ protected void masterOperation(
7379
}
7480

7581
listener.onResponse(
76-
new GetAutoscalingCapacityAction.Response(capacityService.calculate(state, clusterInfoService.getClusterInfo()))
82+
new GetAutoscalingCapacityAction.Response(
83+
capacityService.calculate(state, clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes())
84+
)
7785
);
7886
}
7987

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1919
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2020
import org.elasticsearch.cluster.metadata.Metadata;
21+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.inject.Inject;
2324
import org.elasticsearch.license.LicenseUtils;
@@ -49,6 +50,7 @@ public TransportPutAutoscalingPolicyAction(
4950
final ThreadPool threadPool,
5051
final ActionFilters actionFilters,
5152
final IndexNameExpressionResolver indexNameExpressionResolver,
53+
final AllocationDeciders allocationDeciders,
5254
final AutoscalingCalculateCapacityService.Holder policyValidatorHolder,
5355
final AutoscalingLicenseChecker autoscalingLicenseChecker
5456
) {
@@ -58,7 +60,7 @@ public TransportPutAutoscalingPolicyAction(
5860
threadPool,
5961
actionFilters,
6062
indexNameExpressionResolver,
61-
policyValidatorHolder.get(),
63+
policyValidatorHolder.get(allocationDeciders),
6264
autoscalingLicenseChecker
6365
);
6466
}

0 commit comments

Comments
 (0)