Skip to content

Commit fb53ace

Browse files
committed
Add upper limit for scroll expiry (#26448)
This change adds a dynamic cluster setting named `search.max_keep_alive`. It is used as an upper limit for scroll expiry time in scroll queries and defaults to 1 hour. This change also ensures that the existing setting `search.default_keep_alive` is always smaller than `search.max_keep_alive`. Relates #11511 * check style * add skip for bwc * iter * Add a maxium throttle wait time of 1h for reindex * review * remove empty line
1 parent 0b9e94d commit fb53ace

File tree

8 files changed

+258
-15
lines changed

8 files changed

+258
-15
lines changed

core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,21 +212,31 @@ synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
212212
}
213213

214214
/**
215-
* Adds a settings consumer that accepts the values for two settings. The consumer if only notified if one or both settings change.
215+
* Adds a settings consumer that accepts the values for two settings.
216+
* See {@link #addSettingsUpdateConsumer(Setting, Setting, BiConsumer, BiConsumer)} for details.
217+
*/
218+
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
219+
addSettingsUpdateConsumer(a, b, consumer, (i, j) -> {} );
220+
}
221+
222+
/**
223+
* Adds a settings consumer that accepts the values for two settings. The consumer is only notified if one or both settings change
224+
* and if the provided validator succeeded.
216225
* <p>
217226
* Note: Only settings registered in {@link SettingsModule} can be changed dynamically.
218227
* </p>
219-
* This method registers a compound updater that is useful if two settings are depending on each other. The consumer is always provided
220-
* with both values even if only one of the two changes.
228+
* This method registers a compound updater that is useful if two settings are depending on each other.
229+
* The consumer is always provided with both values even if only one of the two changes.
221230
*/
222-
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b, BiConsumer<A, B> consumer) {
231+
public synchronized <A, B> void addSettingsUpdateConsumer(Setting<A> a, Setting<B> b,
232+
BiConsumer<A, B> consumer, BiConsumer<A, B> validator) {
223233
if (a != get(a.getKey())) {
224234
throw new IllegalArgumentException("Setting is not registered for key [" + a.getKey() + "]");
225235
}
226236
if (b != get(b.getKey())) {
227237
throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]");
228238
}
229-
addSettingsUpdater(Setting.compoundUpdater(consumer, a, b, logger));
239+
addSettingsUpdater(Setting.compoundUpdater(consumer, validator, a, b, logger));
230240
}
231241

232242
/**

core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ public void apply(Settings value, Settings current, Settings previous) {
356356
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
357357
SearchService.DEFAULT_KEEPALIVE_SETTING,
358358
SearchService.KEEPALIVE_INTERVAL_SETTING,
359+
SearchService.MAX_KEEPALIVE_SETTING,
359360
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
360361
Node.WRITE_PORTS_FILE_SETTING,
361362
Node.NODE_NAME_SETTING,

core/src/main/java/org/elasticsearch/common/settings/Setting.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ AbstractScopedSettings.SettingUpdater<T> newUpdater(Consumer<T> consumer, Logger
479479
* See {@link AbstractScopedSettings#addSettingsUpdateConsumer(Setting, Setting, BiConsumer)} and its usage for details.
480480
*/
481481
static <A, B> AbstractScopedSettings.SettingUpdater<Tuple<A, B>> compoundUpdater(final BiConsumer<A, B> consumer,
482-
final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
482+
final BiConsumer<A, B> validator, final Setting<A> aSetting, final Setting<B> bSetting, Logger logger) {
483483
final AbstractScopedSettings.SettingUpdater<A> aSettingUpdater = aSetting.newUpdater(null, logger);
484484
final AbstractScopedSettings.SettingUpdater<B> bSettingUpdater = bSetting.newUpdater(null, logger);
485485
return new AbstractScopedSettings.SettingUpdater<Tuple<A, B>>() {
@@ -490,7 +490,10 @@ public boolean hasChanged(Settings current, Settings previous) {
490490

491491
@Override
492492
public Tuple<A, B> getValue(Settings current, Settings previous) {
493-
return new Tuple<>(aSettingUpdater.getValue(current, previous), bSettingUpdater.getValue(current, previous));
493+
A valueA = aSettingUpdater.getValue(current, previous);
494+
B valueB = bSettingUpdater.getValue(current, previous);
495+
validator.accept(valueA, valueB);
496+
return new Tuple<>(valueA, valueB);
494497
}
495498

496499
@Override

core/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535

3636
import static java.lang.Math.max;
37+
import static java.lang.Math.min;
3738
import static java.lang.Math.round;
3839
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
3940

@@ -44,6 +45,11 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
4445

4546
private static final Logger logger = Loggers.getLogger(WorkerBulkByScrollTaskState.class);
4647

48+
/**
49+
* Maximum wait time allowed for throttling.
50+
*/
51+
private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1);
52+
4753
private final BulkByScrollTask task;
4854

4955
/**
@@ -189,7 +195,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
189195

190196
public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
191197
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
192-
return timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime()));
198+
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
199+
return timeValueNanos(waitTime);
193200
}
194201

195202
/**

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.Index;
4444
import org.elasticsearch.index.IndexService;
4545
import org.elasticsearch.index.IndexSettings;
46+
import org.elasticsearch.index.MergeSchedulerConfig;
4647
import org.elasticsearch.index.engine.Engine;
4748
import org.elasticsearch.index.query.InnerHitContextBuilder;
4849
import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -82,6 +83,7 @@
8283
import org.elasticsearch.search.internal.ShardSearchRequest;
8384
import org.elasticsearch.search.profile.Profilers;
8485
import org.elasticsearch.search.query.QueryPhase;
86+
import org.elasticsearch.search.query.QueryPhaseExecutionException;
8587
import org.elasticsearch.search.query.QuerySearchRequest;
8688
import org.elasticsearch.search.query.QuerySearchResult;
8789
import org.elasticsearch.search.query.ScrollQuerySearchResult;
@@ -106,14 +108,17 @@
106108
import java.util.concurrent.atomic.AtomicLong;
107109
import java.util.function.LongSupplier;
108110

111+
import static org.elasticsearch.common.unit.TimeValue.timeValueHours;
109112
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
110113
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
111114

112115
public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
113116

114117
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
115118
public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING =
116-
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope);
119+
Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope, Property.Dynamic);
120+
public static final Setting<TimeValue> MAX_KEEPALIVE_SETTING =
121+
Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(24), Property.NodeScope, Property.Dynamic);
117122
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
118123
Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);
119124
/**
@@ -147,7 +152,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
147152

148153
private final FetchPhase fetchPhase;
149154

150-
private final long defaultKeepAlive;
155+
private volatile long defaultKeepAlive;
156+
157+
private volatile long maxKeepAlive;
151158

152159
private volatile TimeValue defaultSearchTimeout;
153160

@@ -173,7 +180,10 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
173180
this.fetchPhase = fetchPhase;
174181

175182
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
176-
this.defaultKeepAlive = DEFAULT_KEEPALIVE_SETTING.get(settings).millis();
183+
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
184+
185+
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_KEEPALIVE_SETTING, MAX_KEEPALIVE_SETTING,
186+
this::setKeepAlives, this::validateKeepAlives);
177187

178188
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval, Names.SAME);
179189

@@ -184,6 +194,20 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
184194
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
185195
}
186196

197+
private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
198+
if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
199+
throw new IllegalArgumentException("Default keep alive setting for scroll [" + DEFAULT_KEEPALIVE_SETTING.getKey() + "]" +
200+
" should be smaller than max keep alive [" + MAX_KEEPALIVE_SETTING.getKey() + "], " +
201+
"was (" + defaultKeepAlive.format() + " > " + maxKeepAlive.format() + ")");
202+
}
203+
}
204+
205+
private void setKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
206+
validateKeepAlives(defaultKeepAlive, maxKeepAlive);
207+
this.defaultKeepAlive = defaultKeepAlive.millis();
208+
this.maxKeepAlive = maxKeepAlive.millis();
209+
}
210+
187211
private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
188212
this.defaultSearchTimeout = defaultSearchTimeout;
189213
}
@@ -547,7 +571,7 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S
547571
if (request.scroll() != null && request.scroll().keepAlive() != null) {
548572
keepAlive = request.scroll().keepAlive().millis();
549573
}
550-
context.keepAlive(keepAlive);
574+
contextScrollKeepAlive(context, keepAlive);
551575
context.lowLevelCancellation(lowLevelCancellation);
552576
} catch (Exception e) {
553577
context.close();
@@ -625,6 +649,16 @@ public void freeAllScrollContexts() {
625649
}
626650
}
627651

652+
private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
653+
if (keepAlive > maxKeepAlive) {
654+
throw new QueryPhaseExecutionException(context,
655+
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive).format() + ") is too large. " +
656+
"It must be less than (" + TimeValue.timeValueMillis(maxKeepAlive).format() + "). " +
657+
"This limit can be set by changing the [" + MAX_KEEPALIVE_SETTING.getKey() + "] cluster level setting.");
658+
}
659+
context.keepAlive(keepAlive);
660+
}
661+
628662
private void contextProcessing(SearchContext context) {
629663
// disable timeout while executing a search
630664
context.accessed(-1);
@@ -847,13 +881,13 @@ private void shortcutDocIdsToLoad(SearchContext context) {
847881
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
848882
}
849883

850-
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
884+
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
851885
// process scroll
852886
context.from(context.from() + context.size());
853887
context.scrollContext().scroll = request.scroll();
854888
// update the context keep alive based on the new scroll value
855889
if (request.scroll() != null && request.scroll().keepAlive() != null) {
856-
context.keepAlive(request.scroll().keepAlive().millis());
890+
contextScrollKeepAlive(context, request.scroll().keepAlive().millis());
857891
}
858892
}
859893

core/src/test/java/org/elasticsearch/common/settings/SettingTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,14 +359,20 @@ public void set(Integer a, Integer b) {
359359
this.a = a;
360360
this.b = b;
361361
}
362+
363+
public void validate(Integer a, Integer b) {
364+
if (Integer.signum(a) != Integer.signum(b)) {
365+
throw new IllegalArgumentException("boom");
366+
}
367+
}
362368
}
363369

364370

365371
public void testComposite() {
366372
Composite c = new Composite();
367373
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
368374
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
369-
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, a, b, logger);
375+
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
370376
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
371377
assertNull(c.a);
372378
assertNull(c.b);
@@ -392,6 +398,40 @@ public void testComposite() {
392398

393399
}
394400

401+
public void testCompositeValidator() {
402+
Composite c = new Composite();
403+
Setting<Integer> a = Setting.intSetting("foo.int.bar.a", 1, Property.Dynamic, Property.NodeScope);
404+
Setting<Integer> b = Setting.intSetting("foo.int.bar.b", 1, Property.Dynamic, Property.NodeScope);
405+
ClusterSettings.SettingUpdater<Tuple<Integer, Integer>> settingUpdater = Setting.compoundUpdater(c::set, c::validate, a, b, logger);
406+
assertFalse(settingUpdater.apply(Settings.EMPTY, Settings.EMPTY));
407+
assertNull(c.a);
408+
assertNull(c.b);
409+
410+
Settings build = Settings.builder().put("foo.int.bar.a", 2).build();
411+
assertTrue(settingUpdater.apply(build, Settings.EMPTY));
412+
assertEquals(2, c.a.intValue());
413+
assertEquals(1, c.b.intValue());
414+
415+
Integer aValue = c.a;
416+
assertFalse(settingUpdater.apply(build, build));
417+
assertSame(aValue, c.a);
418+
Settings previous = build;
419+
build = Settings.builder().put("foo.int.bar.a", 2).put("foo.int.bar.b", 5).build();
420+
assertTrue(settingUpdater.apply(build, previous));
421+
assertEquals(2, c.a.intValue());
422+
assertEquals(5, c.b.intValue());
423+
424+
Settings invalid = Settings.builder().put("foo.int.bar.a", -2).put("foo.int.bar.b", 5).build();
425+
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, () -> settingUpdater.apply(invalid, previous));
426+
assertThat(exc.getMessage(), equalTo("boom"));
427+
428+
// reset to default
429+
assertTrue(settingUpdater.apply(Settings.EMPTY, build));
430+
assertEquals(1, c.a.intValue());
431+
assertEquals(1, c.b.intValue());
432+
433+
}
434+
395435
public void testListSettings() {
396436
Setting<List<String>> listSetting = Setting.listSetting("foo.bar", Arrays.asList("foo,bar"), (s) -> s.toString(),
397437
Property.Dynamic, Property.NodeScope);

0 commit comments

Comments
 (0)