Skip to content

Share thread pools that have similar purposes. #12939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,15 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.indices.IndicesWarmer.WarmerContext;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
Expand Down Expand Up @@ -241,9 +238,9 @@ public int hashCode() {
final class BitDocIdSetFilterWarmer extends IndicesWarmer.Listener {

@Override
public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) {
public void warmNewReaders(final IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context) {
if (!loadRandomAccessFiltersEagerly) {
return TerminationHandle.NO_WAIT;
return;
}

boolean hasNested = false;
Expand All @@ -267,43 +264,25 @@ public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShar
warmUp.add(Queries.newNonNestedFilter());
}

final Executor executor = threadPool.executor(executor());
final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size());
for (final LeafReaderContext ctx : context.searcher().reader().leaves()) {
for (final Filter filterToWarm : warmUp) {
executor.execute(new Runnable() {

@Override
public void run() {
try {
final long start = System.nanoTime();
getAndLoadIfNotPresent(filterToWarm, ctx);
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start));
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm);
} finally {
latch.countDown();
}
try {
final long start = System.nanoTime();
getAndLoadIfNotPresent(filterToWarm, ctx);
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed bitset for [{}], took [{}]", filterToWarm, TimeValue.timeValueNanos(System.nanoTime() - start));
}

});
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to load bitset for [{}]", t, filterToWarm);
}
}
}
return new TerminationHandle() {
@Override
public void awaitTermination() throws InterruptedException {
latch.await();
}
};
}

@Override
public TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, IndicesWarmer.WarmerContext context, ThreadPool threadPool) {
return TerminationHandle.NO_WAIT;
public void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context) {
// no-op
}

}

Cache<Object, Cache<Filter, Value>> getLoadedFilters() {
Expand Down
50 changes: 7 additions & 43 deletions core/src/main/java/org/elasticsearch/indices/IndicesWarmer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.indices;

import com.google.common.collect.Lists;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -31,9 +30,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

Expand All @@ -43,18 +40,15 @@ public final class IndicesWarmer extends AbstractComponent {

public static final String INDEX_WARMER_ENABLED = "index.warmer.enabled";

private final ThreadPool threadPool;

private final ClusterService clusterService;

private final IndicesService indicesService;

private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();

@Inject
public IndicesWarmer(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService) {
public IndicesWarmer(Settings settings, ClusterService clusterService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
}
Expand Down Expand Up @@ -100,27 +94,11 @@ private void warmInternal(final WarmerContext context, boolean topReader) {
}
indexShard.warmerService().onPreWarm();
long time = System.nanoTime();
final List<TerminationHandle> terminationHandles = Lists.newArrayList();
// get a handle on pending tasks
for (final Listener listener : listeners) {
if (topReader) {
terminationHandles.add(listener.warmTopReader(indexShard, indexMetaData, context, threadPool));
listener.warmTopReader(indexShard, indexMetaData, context);
} else {
terminationHandles.add(listener.warmNewReaders(indexShard, indexMetaData, context, threadPool));
}
}
// wait for termination
for (TerminationHandle terminationHandle : terminationHandles) {
try {
terminationHandle.awaitTermination();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (topReader) {
logger.warn("top warming has been interrupted", e);
} else {
logger.warn("warming has been interrupted", e);
}
break;
listener.warmNewReaders(indexShard, indexMetaData, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one sacrifice here is that the warmers will run in series instead of parallel now. That is probably OK unless someone has thousands of the the things and they take a long time to run - like on a newly merged segment. But I'm pretty sure the docs advise against having tons and tons of warmers anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, and current changes will hopefully make warming faster, like doc-values by default (ES 2.0) or disk-based norms (Lucene 5.3).

}
}
long took = System.nanoTime() - time;
Expand All @@ -134,27 +112,13 @@ private void warmInternal(final WarmerContext context, boolean topReader) {
}
}

/** A handle on the execution of warm-up action. */
public interface TerminationHandle {

public static TerminationHandle NO_WAIT = new TerminationHandle() {
@Override
public void awaitTermination() {}
};

/** Wait until execution of the warm-up action completes. */
void awaitTermination() throws InterruptedException;
}
public static abstract class Listener {

public String executor() {
return ThreadPool.Names.WARMER;
}

/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */
public abstract TerminationHandle warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
/** Warm new leaf readers in the current thread. */
public abstract void warmNewReaders(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context);

public abstract TerminationHandle warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context, ThreadPool threadPool);
/** Warm the top reader in the current thread. */
public abstract void warmTopReader(IndexShard indexShard, IndexMetaData indexMetaData, WarmerContext context);
}

public static final class WarmerContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,26 @@
public class RestThreadPoolAction extends AbstractCatAction {

private final static String[] SUPPORTED_NAMES = new String[]{
ThreadPool.Names.BULK,
ThreadPool.Names.FLUSH,
ThreadPool.Names.FETCH_SHARD,
ThreadPool.Names.GENERIC,
ThreadPool.Names.GET,
ThreadPool.Names.INDEX,
ThreadPool.Names.MANAGEMENT,
ThreadPool.Names.OPTIMIZE,
ThreadPool.Names.PERCOLATE,
ThreadPool.Names.REFRESH,
ThreadPool.Names.SEARCH,
ThreadPool.Names.READ,
ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST,
ThreadPool.Names.WARMER
ThreadPool.Names.WRITE
};

private final static String[] SUPPORTED_ALIASES = new String[]{
"b",
"f",
"fs",
"ge",
"g",
"i",
"ma",
"o",
"p",
"r",
"s",
"sn",
"su",
"w"
};

Expand All @@ -86,9 +78,8 @@ public class RestThreadPoolAction extends AbstractCatAction {
}

private final static String[] DEFAULT_THREAD_POOLS = new String[]{
ThreadPool.Names.BULK,
ThreadPool.Names.INDEX,
ThreadPool.Names.SEARCH,
ThreadPool.Names.WRITE,
ThreadPool.Names.READ
};

private final static Map<String, String> ALIAS_TO_THREAD_POOL;
Expand Down
Loading