Skip to content

Fix enrich coordinator to reject documents instead of deadlocking #56247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -65,7 +66,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
// Write tp is expected when executing enrich processor from index / bulk api
// Management tp is expected when executing enrich processor from ingest simulate api
// Search tp is allowed for now - After enriching, the remaining parts of the pipeline are processed on the
// search thread, which could end up here again if there is more than one enrich processor in a pipeline.
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE)
|| Thread.currentThread().getName().contains(ThreadPool.Names.SEARCH)
|| Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT);
coordinator.schedule(request, listener);
}
Expand All @@ -76,6 +80,7 @@ public static class Coordinator {
final BiConsumer<MultiSearchRequest, BiConsumer<MultiSearchResponse, Exception>> lookupFunction;
final int maxLookupsPerRequest;
final int maxNumberOfConcurrentRequests;
final int queueCapacity;
final BlockingQueue<Slot> queue;
final AtomicInteger remoteRequestsCurrent = new AtomicInteger(0);
volatile long remoteRequestsTotal = 0;
Expand All @@ -99,21 +104,30 @@ public Coordinator(Client client, Settings settings) {
this.lookupFunction = lookupFunction;
this.maxLookupsPerRequest = maxLookupsPerRequest;
this.maxNumberOfConcurrentRequests = maxNumberOfConcurrentRequests;
this.queueCapacity = queueCapacity;
this.queue = new ArrayBlockingQueue<>(queueCapacity);
}

void schedule(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// Use put(...), because if queue is full then this method will wait until a free slot becomes available
// The calling thread here is a write thread (write tp is used by ingest) and
// this will create natural back pressure from the enrich processor.
// If there are no write threads available then write requests with ingestion will fail with 429 error code.
try {
queue.put(new Slot(searchRequest, listener));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("unable to add item to queue", e);
}
// Use offer(...) instead of put(...). We are on a write thread and blocking here can be dangerous,
// especially since the logic to kick off draining the queue is located right after this section. If we
// cannot insert a request to the queue, we should reject the document with a 429 error code.
boolean accepted = queue.offer(new Slot(searchRequest, listener));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

int queueSize = queue.size();

// Coordinate lookups no matter what, even if queues were full. Search threads should be draining the queue,
// but they may be busy with processing the remaining work for enrich results. If there is more than one
// enrich processor in a pipeline, those search threads may find themselves here again before they can
// coordinate the next set of lookups.
coordinateLookups();

if (accepted == false) {
listener.onFailure(
new EsRejectedExecutionException(
"Could not perform enrichment, " + "enrich coordination queue at capacity [" + queueSize + "/" + queueCapacity + "]"
)
);
}
}

CoordinatorStats getStats(String nodeId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineAction;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class EnrichResiliencyTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(ReindexPlugin.class, IngestCommonPlugin.class, LocalStateEnrich.class);
}

@Override
protected Settings nodeSettings() {
// Severely throttle the processing throughput to reach max capacity easier
return Settings.builder()
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
.put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)
.build();
}

public void testWriteThreadLivenessBackToBack() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;

client().index(
new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
)
).actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(
PutEnrichPolicyAction.INSTANCE,
new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
)
).actionGet();

client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_2");
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName, BytesReference.bytes(pipe1), XContentType.JSON)
).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
ensureGreen();

long testSuffix = System.currentTimeMillis();
String enrichIndexName = "enrich_lookup_" + testSuffix;
String enrichPolicyName = "enrich_policy_" + testSuffix;
String enrichPipelineName = "enrich_pipeline_" + testSuffix;
String enrichedIndexName = "enrich_results_" + testSuffix;
String enrichPipelineName1 = enrichPipelineName + "_1";
String enrichPipelineName2 = enrichPipelineName + "_2";

client().index(
new IndexRequest(enrichIndexName).source(
JsonXContent.contentBuilder().startObject().field("my_key", "key").field("my_value", "data").endObject()
)
).actionGet();

client().admin().indices().refresh(new RefreshRequest(enrichIndexName)).actionGet();

client().execute(
PutEnrichPolicyAction.INSTANCE,
new PutEnrichPolicyAction.Request(
enrichPolicyName,
new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"))
)
).actionGet();

client().execute(
ExecuteEnrichPolicyAction.INSTANCE,
new ExecuteEnrichPolicyAction.Request(enrichPolicyName).setWaitForCompletion(true)
).actionGet();

XContentBuilder pipe1 = JsonXContent.contentBuilder();
pipe1.startObject();
{
pipe1.startArray("processors");
{
pipe1.startObject();
{
pipe1.startObject("enrich");
{
pipe1.field("policy_name", enrichPolicyName);
pipe1.field("field", "custom_id");
pipe1.field("target_field", "enrich_value_1");
}
pipe1.endObject();
}
pipe1.endObject();
pipe1.startObject();
{
pipe1.startObject("pipeline");
{
pipe1.field("name", enrichPipelineName2);
}
pipe1.endObject();
}
pipe1.endObject();
}
pipe1.endArray();
}
pipe1.endObject();

XContentBuilder pipe2 = JsonXContent.contentBuilder();
pipe2.startObject();
{
pipe2.startArray("processors");
{
pipe2.startObject();
{
pipe2.startObject("enrich");
{
pipe2.field("policy_name", enrichPolicyName);
pipe2.field("field", "custom_id");
pipe2.field("target_field", "enrich_value_2");
}
pipe2.endObject();
}
pipe2.endObject();
}
pipe2.endArray();
}
pipe2.endObject();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName1, BytesReference.bytes(pipe1), XContentType.JSON)
).actionGet();

client().execute(
PutPipelineAction.INSTANCE,
new PutPipelineRequest(enrichPipelineName2, BytesReference.bytes(pipe2), XContentType.JSON)
).actionGet();

client().admin().indices().create(new CreateIndexRequest(enrichedIndexName)).actionGet();

XContentBuilder doc = JsonXContent.contentBuilder().startObject().field("custom_id", "key").endObject();

BulkRequest bulk = new BulkRequest(enrichedIndexName);
bulk.timeout(new TimeValue(10, TimeUnit.SECONDS));
for (int idx = 0; idx < 50; idx++) {
bulk.add(new IndexRequest().source(doc).setPipeline(enrichPipelineName1));
}

BulkResponse bulkItemResponses = client().bulk(bulk).actionGet(new TimeValue(30, TimeUnit.SECONDS));

assertTrue(bulkItemResponses.hasFailures());
BulkItemResponse.Failure firstFailure = null;
int successfulItems = 0;
for (BulkItemResponse item : bulkItemResponses.getItems()) {
if (item.isFailed() && firstFailure == null) {
firstFailure = item.getFailure();
} else if (item.isFailed() == false) {
successfulItems++;
}
}
assertNotNull(firstFailure);
assertThat(firstFailure.getStatus().getStatus(), is(equalTo(429)));
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
}
}
Loading