-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Make ILMHistoryStore.putAsync truly async #50403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
d5cc5fa
6878d90
42c9c84
a8e7460
d0cac4e
3e2fc23
9946942
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,11 +31,14 @@ | |
import org.elasticsearch.common.xcontent.ToXContent; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentFactory; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
|
||
|
@@ -54,26 +57,56 @@ public class ILMHistoryStore implements Closeable { | |
public static final String ILM_HISTORY_INDEX_PREFIX = "ilm-history-" + INDEX_TEMPLATE_VERSION + "-"; | ||
public static final String ILM_HISTORY_ALIAS = "ilm-history-" + INDEX_TEMPLATE_VERSION; | ||
|
||
private final Client client; | ||
private final ClusterService clusterService; | ||
private final boolean ilmHistoryEnabled; | ||
private final BulkProcessor processor; | ||
private final ThreadPool threadPool; | ||
|
||
public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService) { | ||
this.client = client; | ||
this.clusterService = clusterService; | ||
ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); | ||
public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) { | ||
this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); | ||
this.threadPool = threadPool; | ||
|
||
this.processor = BulkProcessor.builder( | ||
new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN)::bulk, | ||
new BulkProcessor.Listener() { | ||
@Override | ||
public void beforeBulk(long executionId, BulkRequest request) { } | ||
public void beforeBulk(long executionId, BulkRequest request) { | ||
// Prior to actually performing the bulk, we should ensure the index exists, and | ||
// if we were unable to create it or it was in a bad state, we should not | ||
// attempt to index documents. | ||
try { | ||
final CompletableFuture<Boolean> indexCreated = new CompletableFuture<>(); | ||
ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(indexCreated::complete, | ||
ex -> { | ||
logger.warn("failed to create ILM history store index prior to issuing bulk request", ex); | ||
indexCreated.completeExceptionally(ex); | ||
})); | ||
try { | ||
indexCreated.get(); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException("interrupted waiting for ILM history index to be created prior to indexing", e); | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException("cannot index ILM history items as index could not be created", e); | ||
} | ||
} catch (Exception e) { | ||
logger.warn("unable to index the following ILM history items:\n{}", | ||
request.requests().stream() | ||
.filter(dwr -> (dwr instanceof IndexRequest)) | ||
.map(dwr -> ((IndexRequest) dwr)) | ||
.map(IndexRequest::sourceAsMap) | ||
.map(Object::toString) | ||
.collect(Collectors.joining("\n"))); | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { | ||
long items = request.numberOfActions(); | ||
logger.trace("indexed [{}] items into ILM history index", items); | ||
logger.trace("indexed [{}] items into ILM history index [{}]", items, | ||
dakrone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Arrays.stream(response.getItems()) | ||
.map(BulkItemResponse::getIndex) | ||
.distinct() | ||
.collect(Collectors.joining(","))); | ||
if (response.hasFailures()) { | ||
Map<String, String> failures = Arrays.stream(response.getItems()) | ||
.filter(BulkItemResponse::isFailed) | ||
|
@@ -105,18 +138,24 @@ public void putAsync(ILMHistoryItem item) { | |
LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); | ||
return; | ||
} | ||
logger.trace("about to index ILM history item in index [{}]: [{}]", ILM_HISTORY_ALIAS, item); | ||
ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> { | ||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { | ||
item.toXContent(builder, ToXContent.EMPTY_PARAMS); | ||
IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); | ||
processor.add(request); | ||
} catch (IOException exception) { | ||
logger.error(new ParameterizedMessage("failed to index ILM history item in index [{}]: [{}]", | ||
ILM_HISTORY_ALIAS, item), exception); | ||
} | ||
}, ex -> logger.error(new ParameterizedMessage("failed to ensure ILM history index exists, not indexing history item [{}]", | ||
item), ex))); | ||
logger.trace("queueing ILM history item for indexing [{}]: [{}]", ILM_HISTORY_ALIAS, item); | ||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { | ||
item.toXContent(builder, ToXContent.EMPTY_PARAMS); | ||
IndexRequest request = new IndexRequest(ILM_HISTORY_ALIAS).source(builder); | ||
// TODO: remove the threadpool wrapping when the .add call is non-blocking | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this is not intuitive/obvious. Maybe in a follow-up PR shall we rename the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a followup we are going to make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Brill, thanks @dakrone |
||
// (it can currently execute the bulk request occasionally) | ||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { | ||
dakrone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
processor.add(request); | ||
} catch (Exception e) { | ||
logger.error(new ParameterizedMessage("failed add ILM history item to queue for index [{}]: [{}]", | ||
ILM_HISTORY_ALIAS, item), e); | ||
} | ||
}); | ||
} catch (IOException exception) { | ||
logger.error(new ParameterizedMessage("failed to queue ILM history item in index [{}]: [{}]", | ||
ILM_HISTORY_ALIAS, item), exception); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -134,6 +173,7 @@ static void ensureHistoryIndex(Client client, ClusterState state, ActionListener | |
|
||
if (ilmHistory == null && initialHistoryIndex == null) { | ||
// No alias or index exists with the expected names, so create the index with appropriate alias | ||
logger.debug("creating ILM history index [{}]", initialHistoryIndexName); | ||
client.admin().indices().prepareCreate(initialHistoryIndexName) | ||
.setWaitForActiveShards(1) | ||
.addAlias(new Alias(ILM_HISTORY_ALIAS) | ||
|
Uh oh!
There was an error while loading. Please reload this page.