Skip to content

Commit 1391288

Browse files
authored
Watcher: Use Bulkprocessor in HistoryStore/TriggeredWatchStore (#32490)
Currently a watch execution results in one bulk request, when the triggered watches are written into the that index, that need to be executed. However the update of the watch status, the creation of the watch history entry as well as the deletion of the triggered watches index are all single document operations. This can have quite a negative impact, once you are executing a lot of watches, as each execution results in 4 documents writes, three of them being single document actions. This commit switches to a bulk processor instead of a single document action for writing watch history entries and deleting triggered watch entries. However the defaults are to run synchronous as before because the number of concurrent requests is set to 0. This also fixes a bug, where the deletion of the triggered watch entry was done asynchronously. However if you have a high number of watches being executed, you can configure watcher to delete the triggered watches entries as well as writing the watch history entries via bulk requests. The triggered watches deletions should still happen in a timely manner, where as the history entries might actually be bound by size as one entry can easily have 20kb. The following settings have been added: - xpack.watcher.bulk.actions (default 1) - xpack.watcher.bulk.concurrent_requests (default 0) - xpack.watcher.bulk.flush_interval (default 1s) - xpack.watcher.bulk.size (default 1mb) The drawback of this is of course, that on a node outage you might end up with watch history entries not being written or watches needing to be executing again because they have not been deleted from the triggered watches index. The window of these two cases increases configuring the bulk processor to wait to reach certain thresholds.
1 parent e075b87 commit 1391288

File tree

7 files changed

+291
-163
lines changed

7 files changed

+291
-163
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
*/
66
package org.elasticsearch.xpack.watcher;
77

8+
import org.apache.logging.log4j.LogManager;
89
import org.apache.logging.log4j.Logger;
910
import org.elasticsearch.action.ActionRequest;
1011
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.bulk.BulkItemResponse;
13+
import org.elasticsearch.action.bulk.BulkProcessor;
14+
import org.elasticsearch.action.bulk.BulkRequest;
15+
import org.elasticsearch.action.bulk.BulkResponse;
1116
import org.elasticsearch.bootstrap.BootstrapCheck;
1217
import org.elasticsearch.client.Client;
1318
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -20,13 +25,14 @@
2025
import org.elasticsearch.common.inject.util.Providers;
2126
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2227
import org.elasticsearch.common.logging.LoggerMessageFormat;
23-
import org.elasticsearch.common.logging.Loggers;
2428
import org.elasticsearch.common.regex.Regex;
2529
import org.elasticsearch.common.settings.ClusterSettings;
2630
import org.elasticsearch.common.settings.IndexScopedSettings;
2731
import org.elasticsearch.common.settings.Setting;
2832
import org.elasticsearch.common.settings.Settings;
2933
import org.elasticsearch.common.settings.SettingsFilter;
34+
import org.elasticsearch.common.unit.ByteSizeUnit;
35+
import org.elasticsearch.common.unit.ByteSizeValue;
3036
import org.elasticsearch.common.unit.TimeValue;
3137
import org.elasticsearch.common.util.concurrent.EsExecutors;
3238
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -51,6 +57,7 @@
5157
import org.elasticsearch.threadpool.FixedExecutorBuilder;
5258
import org.elasticsearch.threadpool.ThreadPool;
5359
import org.elasticsearch.watcher.ResourceWatcherService;
60+
import org.elasticsearch.xpack.core.ClientHelper;
5461
import org.elasticsearch.xpack.core.XPackPlugin;
5562
import org.elasticsearch.xpack.core.XPackSettings;
5663
import org.elasticsearch.xpack.core.ssl.SSLService;
@@ -184,12 +191,16 @@
184191
import java.util.List;
185192
import java.util.Map;
186193
import java.util.Set;
194+
import java.util.concurrent.TimeUnit;
187195
import java.util.function.Consumer;
188196
import java.util.function.Function;
189197
import java.util.function.Supplier;
190198
import java.util.function.UnaryOperator;
199+
import java.util.stream.Collectors;
191200

192201
import static java.util.Collections.emptyList;
202+
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;
203+
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
193204

194205
public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, ReloadablePlugin {
195206

@@ -201,6 +212,16 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
201212
Setting.boolSetting("xpack.watcher.encrypt_sensitive_data", false, Setting.Property.NodeScope);
202213
public static final Setting<TimeValue> MAX_STOP_TIMEOUT_SETTING =
203214
Setting.timeSetting("xpack.watcher.stop.timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
215+
private static final Setting<Integer> SETTING_BULK_ACTIONS =
216+
Setting.intSetting("xpack.watcher.bulk.actions", 1, 1, 10000, NodeScope);
217+
private static final Setting<Integer> SETTING_BULK_CONCURRENT_REQUESTS =
218+
Setting.intSetting("xpack.watcher.bulk.concurrent_requests", 0, 0, 20, NodeScope);
219+
private static final Setting<TimeValue> SETTING_BULK_FLUSH_INTERVAL =
220+
Setting.timeSetting("xpack.watcher.bulk.flush_interval", TimeValue.timeValueSeconds(1), NodeScope);
221+
private static final Setting<ByteSizeValue> SETTING_BULK_SIZE =
222+
Setting.byteSizeSetting("xpack.watcher.bulk.size", new ByteSizeValue(1, ByteSizeUnit.MB),
223+
new ByteSizeValue(1, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), NodeScope);
224+
204225

205226
public static final ScriptContext<SearchScript.Factory> SCRIPT_SEARCH_CONTEXT =
206227
new ScriptContext<>("xpack", SearchScript.Factory.class);
@@ -210,9 +231,10 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
210231
public static final ScriptContext<TemplateScript.Factory> SCRIPT_TEMPLATE_CONTEXT
211232
= new ScriptContext<>("xpack_template", TemplateScript.Factory.class);
212233

213-
private static final Logger logger = Loggers.getLogger(Watcher.class);
234+
private static final Logger logger = LogManager.getLogger(Watcher.class);
214235
private WatcherIndexingListener listener;
215236
private HttpClient httpClient;
237+
private BulkProcessor bulkProcessor;
216238

217239
protected final Settings settings;
218240
protected final boolean transportClient;
@@ -318,7 +340,49 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
318340
final InputRegistry inputRegistry = new InputRegistry(settings, inputFactories);
319341
inputFactories.put(ChainInput.TYPE, new ChainInputFactory(settings, inputRegistry));
320342

321-
final HistoryStore historyStore = new HistoryStore(settings, client);
343+
bulkProcessor = BulkProcessor.builder(ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN), new BulkProcessor.Listener() {
344+
@Override
345+
public void beforeBulk(long executionId, BulkRequest request) {
346+
}
347+
348+
@Override
349+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
350+
if (response.hasFailures()) {
351+
Map<String, String> triggeredWatches = Arrays.stream(response.getItems())
352+
.filter(BulkItemResponse::isFailed)
353+
.filter(r -> r.getIndex().startsWith(TriggeredWatchStoreField.INDEX_NAME))
354+
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
355+
if (triggeredWatches.isEmpty() == false) {
356+
String failure = triggeredWatches.values().stream().collect(Collectors.joining(", "));
357+
logger.error("triggered watches could not be deleted {}, failure [{}]",
358+
triggeredWatches.keySet(), Strings.substring(failure, 0, 2000));
359+
}
360+
361+
Map<String, String> overwrittenIds = Arrays.stream(response.getItems())
362+
.filter(BulkItemResponse::isFailed)
363+
.filter(r -> r.getIndex().startsWith(HistoryStoreField.INDEX_PREFIX))
364+
.filter(r -> r.getVersion() > 1)
365+
.collect(Collectors.toMap(BulkItemResponse::getId, BulkItemResponse::getFailureMessage));
366+
if (overwrittenIds.isEmpty() == false) {
367+
String failure = overwrittenIds.values().stream().collect(Collectors.joining(", "));
368+
logger.info("overwrote watch history entries {}, possible second execution of a triggered watch, failure [{}]",
369+
overwrittenIds.keySet(), Strings.substring(failure, 0, 2000));
370+
}
371+
}
372+
}
373+
374+
@Override
375+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
376+
logger.error("error executing bulk", failure);
377+
}
378+
})
379+
.setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings))
380+
.setBulkActions(SETTING_BULK_ACTIONS.get(settings))
381+
.setBulkSize(SETTING_BULK_SIZE.get(settings))
382+
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
383+
.build();
384+
385+
HistoryStore historyStore = new HistoryStore(settings, bulkProcessor);
322386

323387
// schedulers
324388
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
@@ -340,7 +404,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
340404
final TriggerService triggerService = new TriggerService(settings, triggerEngines);
341405

342406
final TriggeredWatch.Parser triggeredWatchParser = new TriggeredWatch.Parser(settings, triggerService);
343-
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser);
407+
final TriggeredWatchStore triggeredWatchStore = new TriggeredWatchStore(settings, client, triggeredWatchParser, bulkProcessor);
344408

345409
final WatcherSearchTemplateService watcherSearchTemplateService =
346410
new WatcherSearchTemplateService(settings, scriptService, xContentRegistry);
@@ -416,6 +480,12 @@ public List<Setting<?>> getSettings() {
416480
settings.add(Setting.simpleString("xpack.watcher.execution.scroll.timeout", Setting.Property.NodeScope));
417481
settings.add(WatcherLifeCycleService.SETTING_REQUIRE_MANUAL_START);
418482

483+
// bulk processor configuration
484+
settings.add(SETTING_BULK_ACTIONS);
485+
settings.add(SETTING_BULK_CONCURRENT_REQUESTS);
486+
settings.add(SETTING_BULK_FLUSH_INTERVAL);
487+
settings.add(SETTING_BULK_SIZE);
488+
419489
// notification services
420490
settings.addAll(SlackService.getSettings());
421491
settings.addAll(EmailService.getSettings());
@@ -608,7 +678,15 @@ public List<ScriptContext<?>> getContexts() {
608678

609679
@Override
610680
public void close() throws IOException {
681+
bulkProcessor.flush();
611682
IOUtils.closeWhileHandlingException(httpClient);
683+
try {
684+
if (bulkProcessor.awaitClose(10, TimeUnit.SECONDS) == false) {
685+
logger.warn("failed to properly close watcher bulk processor");
686+
}
687+
} catch (InterruptedException e) {
688+
Thread.currentThread().interrupt();
689+
}
612690
}
613691

614692
/**

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -320,11 +320,8 @@ record = createWatchRecord(record, ctx, e);
320320
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
321321
}
322322
}
323-
try {
324-
triggeredWatchStore.delete(ctx.id());
325-
} catch (Exception e) {
326-
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
327-
}
323+
324+
triggeredWatchStore.delete(ctx.id());
328325
}
329326
currentExecutions.get().remove(watchId);
330327
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
@@ -412,14 +409,8 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge
412409
triggeredWatch.id()), exc);
413410
}
414411

415-
try {
416-
triggeredWatchStore.delete(triggeredWatch.id());
417-
} catch (Exception exc) {
418-
logger.error((Supplier<?>) () ->
419-
new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " +
420-
"rejection", triggeredWatch.id()), exc);
421-
}
422-
};
412+
triggeredWatchStore.delete(triggeredWatch.id());
413+
}
423414
}
424415

425416
WatchRecord executeInner(WatchExecutionContext ctx) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1010
import org.elasticsearch.action.bulk.BulkItemResponse;
11+
import org.elasticsearch.action.bulk.BulkProcessor;
1112
import org.elasticsearch.action.bulk.BulkRequest;
1213
import org.elasticsearch.action.bulk.BulkResponse;
1314
import org.elasticsearch.action.delete.DeleteRequest;
@@ -24,14 +25,14 @@
2425
import org.elasticsearch.common.component.AbstractComponent;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.common.unit.TimeValue;
27-
import org.elasticsearch.common.util.concurrent.ThreadContext;
2828
import org.elasticsearch.common.xcontent.ToXContent;
2929
import org.elasticsearch.common.xcontent.XContentBuilder;
3030
import org.elasticsearch.common.xcontent.XContentFactory;
3131
import org.elasticsearch.index.IndexNotFoundException;
3232
import org.elasticsearch.search.SearchHit;
3333
import org.elasticsearch.search.builder.SearchSourceBuilder;
3434
import org.elasticsearch.search.sort.SortBuilders;
35+
import org.elasticsearch.xpack.core.ClientHelper;
3536
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
3637
import org.elasticsearch.xpack.core.watcher.execution.Wid;
3738
import org.elasticsearch.xpack.core.watcher.watch.Watch;
@@ -46,8 +47,6 @@
4647
import java.util.stream.Collectors;
4748

4849
import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
49-
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
50-
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
5150

5251
public class TriggeredWatchStore extends AbstractComponent {
5352

@@ -58,21 +57,17 @@ public class TriggeredWatchStore extends AbstractComponent {
5857

5958
private final TimeValue defaultBulkTimeout;
6059
private final TimeValue defaultSearchTimeout;
60+
private final BulkProcessor bulkProcessor;
6161

62-
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser) {
62+
public TriggeredWatchStore(Settings settings, Client client, TriggeredWatch.Parser triggeredWatchParser, BulkProcessor bulkProcessor) {
6363
super(settings);
6464
this.scrollSize = settings.getAsInt("xpack.watcher.execution.scroll.size", 1000);
65-
this.client = client;
65+
this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN);
6666
this.scrollTimeout = settings.getAsTime("xpack.watcher.execution.scroll.timeout", TimeValue.timeValueMinutes(5));
6767
this.defaultBulkTimeout = settings.getAsTime("xpack.watcher.internal.ops.bulk.default_timeout", TimeValue.timeValueSeconds(120));
6868
this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30));
6969
this.triggeredWatchParser = triggeredWatchParser;
70-
}
71-
72-
public static boolean validate(ClusterState state) {
73-
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
74-
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
75-
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
70+
this.bulkProcessor = bulkProcessor;
7671
}
7772

7873
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BulkResponse> listener) throws IOException {
@@ -81,8 +76,7 @@ public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionList
8176
return;
8277
}
8378

84-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, createBulkRequest(triggeredWatches,
85-
TriggeredWatchStoreField.DOC_TYPE), listener, client::bulk);
79+
client.bulk(createBulkRequest(triggeredWatches), listener);
8680
}
8781

8882
public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws IOException {
@@ -94,14 +88,14 @@ public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws I
9488
/**
9589
* Create a bulk request from the triggered watches with a specified document type
9690
* @param triggeredWatches The list of triggered watches
97-
* @param docType The document type to use, either the current one or legacy
9891
* @return The bulk request for the triggered watches
9992
* @throws IOException If a triggered watch could not be parsed to JSON, this exception is thrown
10093
*/
101-
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches, String docType) throws IOException {
94+
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
10295
BulkRequest request = new BulkRequest();
10396
for (TriggeredWatch triggeredWatch : triggeredWatches) {
104-
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, docType, triggeredWatch.id().value());
97+
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
98+
triggeredWatch.id().value());
10599
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
106100
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
107101
indexRequest.source(builder);
@@ -112,12 +106,15 @@ private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatche
112106
return request;
113107
}
114108

109+
/**
110+
* Delete a triggered watch entry.
111+
* Note that this happens asynchronously, as these kind of requests are batched together to reduce the amount of concurrent requests.
112+
*
113+
* @param wid The ID os the triggered watch id
114+
*/
115115
public void delete(Wid wid) {
116116
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
117-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
118-
client.delete(request); // FIXME shouldn't we wait before saying the delete was successful
119-
}
120-
logger.trace("successfully deleted triggered watch with id [{}]", wid);
117+
bulkProcessor.add(request);
121118
}
122119

123120
/**
@@ -140,9 +137,9 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
140137
return Collections.emptyList();
141138
}
142139

143-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
144-
client.admin().indices().refresh(new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME))
145-
.actionGet(TimeValue.timeValueSeconds(5));
140+
try {
141+
RefreshRequest request = new RefreshRequest(TriggeredWatchStoreField.INDEX_NAME);
142+
client.admin().indices().refresh(request).actionGet(TimeValue.timeValueSeconds(5));
146143
} catch (IndexNotFoundException e) {
147144
return Collections.emptyList();
148145
}
@@ -159,7 +156,7 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
159156
.version(true));
160157

161158
SearchResponse response = null;
162-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
159+
try {
163160
response = client.search(searchRequest).actionGet(defaultSearchTimeout);
164161
logger.debug("trying to find triggered watches for ids {}: found [{}] docs", ids, response.getHits().getTotalHits());
165162
while (response.getHits().getHits().length != 0) {
@@ -176,14 +173,18 @@ public Collection<TriggeredWatch> findTriggeredWatches(Collection<Watch> watches
176173
}
177174
} finally {
178175
if (response != null) {
179-
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
180-
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
181-
clearScrollRequest.addScrollId(response.getScrollId());
182-
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
183-
}
176+
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
177+
clearScrollRequest.addScrollId(response.getScrollId());
178+
client.clearScroll(clearScrollRequest).actionGet(scrollTimeout);
184179
}
185180
}
186181

187182
return triggeredWatches;
188183
}
184+
185+
public static boolean validate(ClusterState state) {
186+
IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME, state.metaData());
187+
return indexMetaData == null || (indexMetaData.getState() == IndexMetaData.State.OPEN &&
188+
state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive());
189+
}
189190
}

0 commit comments

Comments
 (0)