Skip to content

Commit a5631b4

Browse files
authored
Fix enrich coordinator to reject documents instead of deadlocking (#56247) (#57188)
This PR removes the blocking call to insert ingest documents into a queue in the coordinator. It replaces it with an offer call which will throw a rejection exception in the event that the queue is full. This prevents deadlocks of the write threads when the queue fills to capacity and there are more than one enrich processors in a pipeline.
1 parent f1ee186 commit a5631b4

File tree

3 files changed

+354
-26
lines changed

3 files changed

+354
-26
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.collect.Tuple;
2020
import org.elasticsearch.common.inject.Inject;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2223
import org.elasticsearch.tasks.Task;
2324
import org.elasticsearch.threadpool.ThreadPool;
2425
import org.elasticsearch.transport.TransportService;
@@ -65,7 +66,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
6566
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
6667
// Write tp is expected when executing enrich processor from index / bulk api
6768
// Management tp is expected when executing enrich processor from ingest simulate api
69+
// Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the
70+
// search thread, which could end up here again if there is more than one enrich processor in a pipeline.
6871
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
72+
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
6973
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
7074
coordinator.schedule(request, listener);
7175
}
@@ -76,6 +80,7 @@ public static class Coordinator {
7680
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
7781
final int maxLookupsPerRequest;
7882
final int maxNumberOfConcurrentRequests;
83+
final int queueCapacity;
7984
final BlockingQueue<Slot> queue;
8085
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
8186
volatile long remoteRequestsTotal = 0;
@@ -99,21 +104,30 @@ public Coordinator(Client client, Settings settings) {
99104
this.lookupFunction = lookupFunction;
100105
this.maxLookupsPerRequest = maxLookupsPerRequest;
101106
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
107+
this.queueCapacity = queueCapacity;
102108
this.queue = new ArrayBlockingQueue<>(queueCapacity);
103109
}
104110

105111
void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
106-
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
107-
// The calling thread here is a write thread (write tp is used by ingest) and
108-
// this will create natural back pressure from the enrich processor.
109-
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
110-
try {
111-
queue.put(new Slot(searchRequest, listener));
112-
} catch (InterruptedException e) {
113-
Thread.currentThread().interrupt();
114-
throw new RuntimeException("unable to add item to queue", e);
115-
}
112+
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous,
113+
// especially since the logic to kick off draining the queue is located right after this section. If we
114+
// cannot insert a request to the queue, we should reject the document with a 429 error code.
115+
boolean accepted = queue.offer(new Slot(searchRequest, listener));
116+
int queueSize = queue.size();
117+
118+
// Coordinate lookups no matter what, even if queues were full. Search threads should be draining the queue,
119+
// but they may be busy with processing the remaining work for enrich results. If there is more than one
120+
// enrich processor in a pipeline, those search threads may find themselves here again before they can
121+
// coordinate the next set of lookups.
116122
coordinateLookups();
123+
124+
if (accepted == false) {
125+
listener.onFailure(
126+
new EsRejectedExecutionException(
127+
"Could not perform enrichment, " + "enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]"
128+
)
129+
);
130+
}
117131
}
118132

119133
CoordinatorStats getStats(String nodeId) {
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
9+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
10+
import org.elasticsearch.action.bulk.BulkItemResponse;
11+
import org.elasticsearch.action.bulk.BulkRequest;
12+
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.ingest.PutPipelineAction;
15+
import org.elasticsearch.action.ingest.PutPipelineRequest;
16+
import org.elasticsearch.action.search.SearchRequest;
17+
import org.elasticsearch.common.bytes.BytesReference;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.unit.TimeValue;
20+
import org.elasticsearch.common.xcontent.XContentBuilder;
21+
import org.elasticsearch.common.xcontent.XContentType;
22+
import org.elasticsearch.common.xcontent.json.JsonXContent;
23+
import org.elasticsearch.index.reindex.ReindexPlugin;
24+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
25+
import org.elasticsearch.plugins.Plugin;
26+
import org.elasticsearch.test.ESSingleNodeTestCase;
27+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
28+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
29+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
30+
31+
import java.util.Arrays;
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.hamcrest.Matchers.containsString;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.Matchers.is;
39+
40+
public class EnrichResiliencyTests extends ESSingleNodeTestCase {
41+
42+
@Override
43+
protected Collection<Class<? extends Plugin>> getPlugins() {
44+
return Arrays.asList(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class);
45+
}
46+
47+
@Override
48+
protected Settings nodeSettings() {
49+
// Severely throttle the processing throughput to reach max capacity easier
50+
return Settings.builder()
51+
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
52+
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
53+
.put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)
54+
.build();
55+
}
56+
57+
public void testWriteThreadLivenessBackToBack() throws Exception {
58+
ensureGreen();
59+
60+
long testSuffix = System.currentTimeMillis();
61+
String enrichIndexName = "enrich_lookup_" + testSuffix;
62+
String enrichPolicyName = "enrich_policy_" + testSuffix;
63+
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
64+
String enrichedIndexName = "enrich_results_" + testSuffix;
65+
66+
client().index(
67+
new IndexRequest(enrichIndexName).source(
68+
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
69+
)
70+
).actionGet();
71+
72+
client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();
73+
74+
client().execute(
75+
PutEnrichPolicyAction.INSTANCE,
76+
new PutEnrichPolicyAction.Request(
77+
enrichPolicyName,
78+
new EnrichPolicy(
79+
EnrichPolicy.MATCH_TYPE,
80+
null,
81+
Collections.singletonList(enrichIndexName),
82+
"my_key",
83+
Collections.singletonList("my_value")
84+
)
85+
)
86+
).actionGet();
87+
88+
client().execute(
89+
ExecuteEnrichPolicyAction.INSTANCE,
90+
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
91+
).actionGet();
92+
93+
XContentBuilder pipe1 = JsonXContent.contentBuilder();
94+
pipe1.startObject();
95+
{
96+
pipe1.startArray("processors");
97+
{
98+
pipe1.startObject();
99+
{
100+
pipe1.startObject("enrich");
101+
{
102+
pipe1.field("policy_name", enrichPolicyName);
103+
pipe1.field("field", "custom_id");
104+
pipe1.field("target_field", "enrich_value_1");
105+
}
106+
pipe1.endObject();
107+
}
108+
pipe1.endObject();
109+
pipe1.startObject();
110+
{
111+
pipe1.startObject("enrich");
112+
{
113+
pipe1.field("policy_name", enrichPolicyName);
114+
pipe1.field("field", "custom_id");
115+
pipe1.field("target_field", "enrich_value_2");
116+
}
117+
pipe1.endObject();
118+
}
119+
pipe1.endObject();
120+
}
121+
pipe1.endArray();
122+
}
123+
pipe1.endObject();
124+
125+
client().execute(
126+
PutPipelineAction.INSTANCE,
127+
new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON)
128+
).actionGet();
129+
130+
client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();
131+
132+
XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();
133+
134+
BulkRequest bulk = new BulkRequest(enrichedIndexName);
135+
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
136+
for (int idx = 0; idx < 50; idx++) {
137+
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
138+
}
139+
140+
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
141+
142+
assertTrue(bulkItemResponses.hasFailures());
143+
BulkItemResponse.Failure firstFailure = null;
144+
int successfulItems = 0;
145+
for (BulkItemResponse item : bulkItemResponses.getItems()) {
146+
if (item.isFailed() && firstFailure == null) {
147+
firstFailure = item.getFailure();
148+
} else if (item.isFailed() == false) {
149+
successfulItems++;
150+
}
151+
}
152+
assertNotNull(firstFailure);
153+
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
154+
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
155+
156+
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
157+
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
158+
}
159+
160+
public void testWriteThreadLivenessWithPipeline() throws Exception {
161+
ensureGreen();
162+
163+
long testSuffix = System.currentTimeMillis();
164+
String enrichIndexName = "enrich_lookup_" + testSuffix;
165+
String enrichPolicyName = "enrich_policy_" + testSuffix;
166+
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
167+
String enrichedIndexName = "enrich_results_" + testSuffix;
168+
String enrichPipelineName1 = enrichPipelineName + "_1";
169+
String enrichPipelineName2 = enrichPipelineName + "_2";
170+
171+
client().index(
172+
new IndexRequest(enrichIndexName).source(
173+
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
174+
)
175+
).actionGet();
176+
177+
client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();
178+
179+
client().execute(
180+
PutEnrichPolicyAction.INSTANCE,
181+
new PutEnrichPolicyAction.Request(
182+
enrichPolicyName,
183+
new EnrichPolicy(
184+
EnrichPolicy.MATCH_TYPE,
185+
null,
186+
Collections.singletonList(enrichIndexName),
187+
"my_key",
188+
Collections.singletonList("my_value")
189+
)
190+
)
191+
).actionGet();
192+
193+
client().execute(
194+
ExecuteEnrichPolicyAction.INSTANCE,
195+
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
196+
).actionGet();
197+
198+
XContentBuilder pipe1 = JsonXContent.contentBuilder();
199+
pipe1.startObject();
200+
{
201+
pipe1.startArray("processors");
202+
{
203+
pipe1.startObject();
204+
{
205+
pipe1.startObject("enrich");
206+
{
207+
pipe1.field("policy_name", enrichPolicyName);
208+
pipe1.field("field", "custom_id");
209+
pipe1.field("target_field", "enrich_value_1");
210+
}
211+
pipe1.endObject();
212+
}
213+
pipe1.endObject();
214+
pipe1.startObject();
215+
{
216+
pipe1.startObject("pipeline");
217+
{
218+
pipe1.field("name", enrichPipelineName2);
219+
}
220+
pipe1.endObject();
221+
}
222+
pipe1.endObject();
223+
}
224+
pipe1.endArray();
225+
}
226+
pipe1.endObject();
227+
228+
XContentBuilder pipe2 = JsonXContent.contentBuilder();
229+
pipe2.startObject();
230+
{
231+
pipe2.startArray("processors");
232+
{
233+
pipe2.startObject();
234+
{
235+
pipe2.startObject("enrich");
236+
{
237+
pipe2.field("policy_name", enrichPolicyName);
238+
pipe2.field("field", "custom_id");
239+
pipe2.field("target_field", "enrich_value_2");
240+
}
241+
pipe2.endObject();
242+
}
243+
pipe2.endObject();
244+
}
245+
pipe2.endArray();
246+
}
247+
pipe2.endObject();
248+
249+
client().execute(
250+
PutPipelineAction.INSTANCE,
251+
new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON)
252+
).actionGet();
253+
254+
client().execute(
255+
PutPipelineAction.INSTANCE,
256+
new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON)
257+
).actionGet();
258+
259+
client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();
260+
261+
XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();
262+
263+
BulkRequest bulk = new BulkRequest(enrichedIndexName);
264+
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
265+
for (int idx = 0; idx < 50; idx++) {
266+
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
267+
}
268+
269+
BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));
270+
271+
assertTrue(bulkItemResponses.hasFailures());
272+
BulkItemResponse.Failure firstFailure = null;
273+
int successfulItems = 0;
274+
for (BulkItemResponse item : bulkItemResponses.getItems()) {
275+
if (item.isFailed() && firstFailure == null) {
276+
firstFailure = item.getFailure();
277+
} else if (item.isFailed() == false) {
278+
successfulItems++;
279+
}
280+
}
281+
assertNotNull(firstFailure);
282+
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
283+
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
284+
285+
client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
286+
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
287+
}
288+
}

0 commit comments

Comments
 (0)