Skip to content

Commit fb5ef50

Browse files
author
Hendrik Muhs
committed
[Transform] add throttling (elastic#56007)
add throttling to transform, throttling will slow down search requests by delaying the execution based on a documents per second metric. fixes elastic#54862
1 parent cc21468 commit fb5ef50

File tree

41 files changed

+2075
-552
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2075
-552
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.transform.transforms;
21+
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.ToXContentObject;
25+
import org.elasticsearch.common.xcontent.XContentBuilder;
26+
import org.elasticsearch.common.xcontent.XContentParser;
27+
28+
import java.io.IOException;
29+
import java.util.Objects;
30+
31+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
32+
33+
public class SettingsConfig implements ToXContentObject {
34+
35+
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
36+
private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
37+
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
38+
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
39+
40+
private final Integer maxPageSearchSize;
41+
private final Float docsPerSecond;
42+
43+
private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
44+
"settings_config",
45+
true,
46+
args -> new SettingsConfig((Integer) args[0], (Float) args[1])
47+
);
48+
49+
static {
50+
PARSER.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, MAX_PAGE_SEARCH_SIZE);
51+
PARSER.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, DOCS_PER_SECOND);
52+
}
53+
54+
public static SettingsConfig fromXContent(final XContentParser parser) {
55+
return PARSER.apply(parser, null);
56+
}
57+
58+
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond) {
59+
this.maxPageSearchSize = maxPageSearchSize;
60+
this.docsPerSecond = docsPerSecond;
61+
}
62+
63+
@Override
64+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
65+
builder.startObject();
66+
if (maxPageSearchSize != null) {
67+
if (maxPageSearchSize.equals(DEFAULT_MAX_PAGE_SEARCH_SIZE)) {
68+
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), (Integer) null);
69+
} else {
70+
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
71+
}
72+
}
73+
if (docsPerSecond != null) {
74+
if (docsPerSecond.equals(DEFAULT_DOCS_PER_SECOND)) {
75+
builder.field(DOCS_PER_SECOND.getPreferredName(), (Float) null);
76+
} else {
77+
builder.field(DOCS_PER_SECOND.getPreferredName(), docsPerSecond);
78+
}
79+
}
80+
builder.endObject();
81+
return builder;
82+
}
83+
84+
public Integer getMaxPageSearchSize() {
85+
return maxPageSearchSize;
86+
}
87+
88+
public Float getDocsPerSecond() {
89+
return docsPerSecond;
90+
}
91+
92+
@Override
93+
public boolean equals(Object other) {
94+
if (other == this) {
95+
return true;
96+
}
97+
if (other == null || other.getClass() != getClass()) {
98+
return false;
99+
}
100+
101+
SettingsConfig that = (SettingsConfig) other;
102+
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond);
103+
}
104+
105+
@Override
106+
public int hashCode() {
107+
return Objects.hash(maxPageSearchSize, docsPerSecond);
108+
}
109+
110+
public static Builder builder() {
111+
return new Builder();
112+
}
113+
114+
public static class Builder {
115+
private Integer maxPageSearchSize;
116+
private Float docsPerSecond;
117+
118+
/**
119+
* Sets the paging maximum paging maxPageSearchSize that transform can use when
120+
* pulling the data from the source index.
121+
*
122+
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
123+
*
124+
* @param maxPageSearchSize Integer value between 10 and 10_000
125+
* @return the {@link Builder} with the paging maxPageSearchSize set.
126+
*/
127+
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
128+
this.maxPageSearchSize = maxPageSearchSize == null ? DEFAULT_MAX_PAGE_SEARCH_SIZE : maxPageSearchSize;
129+
return this;
130+
}
131+
132+
/**
133+
* Sets the docs per second that transform can use when pulling the data from the source index.
134+
*
135+
* This setting throttles transform by issuing queries less often, however processing still happens in
136+
* batches. A value of 0 disables throttling (default).
137+
*
138+
* @param docsPerSecond Integer value
139+
* @return the {@link Builder} with requestsPerSecond set.
140+
*/
141+
public Builder setRequestsPerSecond(Float docsPerSecond) {
142+
this.docsPerSecond = docsPerSecond == null ? DEFAULT_DOCS_PER_SECOND : docsPerSecond;
143+
return this;
144+
}
145+
146+
public SettingsConfig build() {
147+
return new SettingsConfig(maxPageSearchSize, docsPerSecond);
148+
}
149+
}
150+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformConfig.java

Lines changed: 74 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class TransformConfig implements ToXContentObject {
4848
public static final ParseField FREQUENCY = new ParseField("frequency");
4949
public static final ParseField DESCRIPTION = new ParseField("description");
5050
public static final ParseField SYNC = new ParseField("sync");
51+
public static final ParseField SETTINGS = new ParseField("settings");
5152
public static final ParseField VERSION = new ParseField("version");
5253
public static final ParseField CREATE_TIME = new ParseField("create_time");
5354
// types of transforms
@@ -58,45 +59,61 @@ public class TransformConfig implements ToXContentObject {
5859
private final DestConfig dest;
5960
private final TimeValue frequency;
6061
private final SyncConfig syncConfig;
62+
private final SettingsConfig settings;
6163
private final PivotConfig pivotConfig;
6264
private final String description;
6365
private final Version transformVersion;
6466
private final Instant createTime;
6567

66-
public static final ConstructingObjectParser<TransformConfig, Void> PARSER =
67-
new ConstructingObjectParser<>("transform", true,
68-
(args) -> {
69-
String id = (String) args[0];
70-
SourceConfig source = (SourceConfig) args[1];
71-
DestConfig dest = (DestConfig) args[2];
72-
TimeValue frequency = (TimeValue) args[3];
73-
SyncConfig syncConfig = (SyncConfig) args[4];
74-
PivotConfig pivotConfig = (PivotConfig) args[5];
75-
String description = (String)args[6];
76-
Instant createTime = (Instant)args[7];
77-
String transformVersion = (String)args[8];
78-
return new TransformConfig(id,
79-
source,
80-
dest,
81-
frequency,
82-
syncConfig,
83-
pivotConfig,
84-
description,
85-
createTime,
86-
transformVersion);
87-
});
68+
public static final ConstructingObjectParser<TransformConfig, Void> PARSER = new ConstructingObjectParser<>(
69+
"transform",
70+
true,
71+
(args) -> {
72+
String id = (String) args[0];
73+
SourceConfig source = (SourceConfig) args[1];
74+
DestConfig dest = (DestConfig) args[2];
75+
TimeValue frequency = (TimeValue) args[3];
76+
SyncConfig syncConfig = (SyncConfig) args[4];
77+
PivotConfig pivotConfig = (PivotConfig) args[5];
78+
String description = (String) args[6];
79+
SettingsConfig settings = (SettingsConfig) args[7];
80+
Instant createTime = (Instant) args[8];
81+
String transformVersion = (String) args[9];
82+
return new TransformConfig(
83+
id,
84+
source,
85+
dest,
86+
frequency,
87+
syncConfig,
88+
pivotConfig,
89+
description,
90+
settings,
91+
createTime,
92+
transformVersion
93+
);
94+
}
95+
);
8896

8997
static {
9098
PARSER.declareString(constructorArg(), ID);
9199
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
92100
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
93-
PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
94-
FREQUENCY, ObjectParser.ValueType.STRING);
101+
PARSER.declareField(
102+
optionalConstructorArg(),
103+
p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
104+
FREQUENCY,
105+
ObjectParser.ValueType.STRING
106+
);
95107
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
96108
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
97109
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
98-
PARSER.declareField(optionalConstructorArg(),
99-
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE);
110+
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
111+
PARSER.declareField(
112+
optionalConstructorArg(),
113+
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()),
114+
CREATE_TIME,
115+
ObjectParser.ValueType.VALUE
116+
);
100117
PARSER.declareString(optionalConstructorArg(), VERSION);
101118
}
102119

@@ -108,7 +125,6 @@ private static SyncConfig parseSyncConfig(XContentParser parser) throws IOExcept
108125
return syncConfig;
109126
}
110127

111-
112128
public static TransformConfig fromXContent(final XContentParser parser) {
113129
return PARSER.apply(parser, null);
114130
}
@@ -125,25 +141,29 @@ public static TransformConfig fromXContent(final XContentParser parser) {
125141
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
126142
*/
127143
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
128-
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null);
144+
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null);
129145
}
130146

131-
TransformConfig(final String id,
132-
final SourceConfig source,
133-
final DestConfig dest,
134-
final TimeValue frequency,
135-
final SyncConfig syncConfig,
136-
final PivotConfig pivotConfig,
137-
final String description,
138-
final Instant createTime,
139-
final String version) {
147+
TransformConfig(
148+
final String id,
149+
final SourceConfig source,
150+
final DestConfig dest,
151+
final TimeValue frequency,
152+
final SyncConfig syncConfig,
153+
final PivotConfig pivotConfig,
154+
final String description,
155+
final SettingsConfig settings,
156+
final Instant createTime,
157+
final String version
158+
) {
140159
this.id = id;
141160
this.source = source;
142161
this.dest = dest;
143162
this.frequency = frequency;
144163
this.syncConfig = syncConfig;
145164
this.pivotConfig = pivotConfig;
146165
this.description = description;
166+
this.settings = settings;
147167
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
148168
this.transformVersion = version == null ? null : Version.fromString(version);
149169
}
@@ -185,6 +205,11 @@ public String getDescription() {
185205
return description;
186206
}
187207

208+
@Nullable
209+
public SettingsConfig getSettings() {
210+
return settings;
211+
}
212+
188213
@Override
189214
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
190215
builder.startObject();
@@ -211,6 +236,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
211236
if (description != null) {
212237
builder.field(DESCRIPTION.getPreferredName(), description);
213238
}
239+
if (settings != null) {
240+
builder.field(SETTINGS.getPreferredName(), settings);
241+
}
214242
if (createTime != null) {
215243
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
216244
}
@@ -240,13 +268,14 @@ public boolean equals(Object other) {
240268
&& Objects.equals(this.description, that.description)
241269
&& Objects.equals(this.syncConfig, that.syncConfig)
242270
&& Objects.equals(this.transformVersion, that.transformVersion)
271+
&& Objects.equals(this.settings, that.settings)
243272
&& Objects.equals(this.createTime, that.createTime)
244273
&& Objects.equals(this.pivotConfig, that.pivotConfig);
245274
}
246275

247276
@Override
248277
public int hashCode() {
249-
return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description);
278+
return Objects.hash(id, source, dest, frequency, syncConfig, settings, createTime, transformVersion, pivotConfig, description);
250279
}
251280

252281
@Override
@@ -266,6 +295,7 @@ public static class Builder {
266295
private TimeValue frequency;
267296
private SyncConfig syncConfig;
268297
private PivotConfig pivotConfig;
298+
private SettingsConfig settings;
269299
private String description;
270300

271301
public Builder setId(String id) {
@@ -303,8 +333,13 @@ public Builder setDescription(String description) {
303333
return this;
304334
}
305335

336+
public Builder setSettings(SettingsConfig settings) {
337+
this.settings = settings;
338+
return this;
339+
}
340+
306341
public TransformConfig build() {
307-
return new TransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null);
342+
return new TransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, settings, null, null);
308343
}
309344
}
310345
}

0 commit comments

Comments
 (0)