Skip to content

Introducing "took" time (in ms) for _msearch #23767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand Down Expand Up @@ -112,11 +114,14 @@ public Exception getFailure() {

private Item[] items;

private long tookInMillis;

MultiSearchResponse() {
}

public MultiSearchResponse(Item[] items) {
public MultiSearchResponse(Item[] items, long tookInMillis) {
this.items = items;
this.tookInMillis = tookInMillis;
}

@Override
Expand All @@ -131,13 +136,23 @@ public Item[] getResponses() {
return this.items;
}

/**
* How long the msearch took.
*/
public TimeValue getTook() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is only used in tests, and only to get the milliseconds, so I think that we can remove it and just use getTookInMillis in those places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method exists in both SearchResponse and BulkResponse.
I would prefer to keep the consistency unless there is a good reason no to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a good reason, the method is unneeded.

Copy link
Contributor Author

@olcbean olcbean Apr 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So shall I remove also the corresponding methods for Bulk and Search?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as part of this change.

return new TimeValue(tookInMillis);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new Item[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = Item.readItem(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
tookInMillis = in.readVLong();
}
}

@Override
Expand All @@ -147,11 +162,15 @@ public void writeTo(StreamOutput out) throws IOException {
for (Item item : items) {
item.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(tookInMillis);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.search;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand All @@ -34,16 +35,19 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
private SetOnce<Long> startTimeInNanos;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think SetOnce<Long> here is desirable, that's two extra allocations on every multi-search request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The immutability is a positive though, we can obtain this by pushing the start time all the way down via the execute method (i.e., add it as a parameter and chain it through all the way down to finish).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasontedor good point! Thanks.


@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -53,19 +57,23 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
}

TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver resolver, int availableProcessors) {
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
}

@Override
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
startTimeInNanos = new SetOnce<>(relativeTime());

ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

Expand Down Expand Up @@ -111,7 +119,7 @@ static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState st
* @param responseCounter incremented on each response
* @param listener the listener attached to the multi-search request
*/
private void executeSearch(
void executeSearch(
final Queue<SearchRequestSlot> requests,
final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter,
Expand Down Expand Up @@ -164,11 +172,23 @@ private void handleResponse(final int responseSlot, final MultiSearchResponse.It
}

private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]),
buildTookInMillis()));
}

/**
* Builds how long it took to execute the msearch.
*/
private long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeInNanos.get());
}
});
}

private long relativeTime() {
return relativeTimeProvider.getAsLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method is needed.

}

static final class SearchRequestSlot {

final SearchRequest request;
Expand All @@ -178,7 +198,5 @@ static final class SearchRequestSlot {
this.request = request;
this.responseSlot = responseSlot;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change.

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
mSearchResponses.add(new MultiSearchResponse.Item(response, null));
}

listener.onResponse(new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0])));
listener.onResponse(
new MultiSearchResponse(mSearchResponses.toArray(new MultiSearchResponse.Item[0]), randomIntBetween(1, 10000)));
}
};

Expand Down Expand Up @@ -152,10 +153,11 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}));
listener.onResponse(new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
new MultiSearchResponse.Item(response, null)
}, randomIntBetween(1, 10000)));
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* MultiSearch took time tests
*/
public class MultiSearchActionTookTests extends ESTestCase {

private ThreadPool threadPool;
private ClusterService clusterService;

@BeforeClass
public static void beforeClass() {
}

@AfterClass
public static void afterClass() {
}

@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("MultiSearchActionTookTests");
clusterService = createClusterService(threadPool);
}

@After
public void tearDown() throws Exception {
clusterService.close();
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
super.tearDown();
}

// test unit conversion using a controller clock
public void testTookWithControlledClock() throws Exception {
runTestTook(true);
}

// test using System#nanoTime
public void testTookWithRealClock() throws Exception {
runTestTook(false);
}

private void runTestTook(boolean controlledClock) throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest());
AtomicLong expected = new AtomicLong();

TransportMultiSearchAction action = createTransportMultiSearchAction(controlledClock, expected);

action.doExecute(multiSearchRequest, new ActionListener<MultiSearchResponse>() {
@Override
public void onResponse(MultiSearchResponse multiSearchResponse) {
if (controlledClock) {
assertThat(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS),
equalTo(multiSearchResponse.getTook().getMillis()));
} else {
assertThat(multiSearchResponse.getTook().getMillis(),
greaterThanOrEqualTo(TimeUnit.MILLISECONDS.convert(expected.get(), TimeUnit.NANOSECONDS)));
}
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
});
}

private TransportMultiSearchAction createTransportMultiSearchAction(boolean controlledClock, AtomicLong expected) {
Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build();
TaskManager taskManager = mock(TaskManager.class);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};
ActionFilters actionFilters = new ActionFilters(new HashSet<>());
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);

final int availableProcessors = Runtime.getRuntime().availableProcessors();
AtomicInteger counter = new AtomicInteger();
final List<String> threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME);
Randomness.shuffle(threadPoolNames);
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));

TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>(Settings.EMPTY,
"action", threadPool, actionFilters, resolver, taskManager) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
requests.add(request);
commonExecutor.execute(() -> {
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
});
}
};

if (controlledClock) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver,
availableProcessors, expected::get) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener) {
expected.set(1000000);
super.executeSearch(requests, responses, responseCounter, listener);
}
};
} else {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver,
availableProcessors, System::nanoTime) {

@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener) {
long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(0, 10));
expected.set(elapsed);
super.executeSearch(requests, responses, responseCounter, listener);
}
};
}
}

static class Resolver extends IndexNameExpressionResolver {

Resolver(Settings settings) {
super(settings);
}

@Override
public String[] concreteIndexNames(ClusterState state, IndicesRequest request) {
return request.indices();
}
}
}
Loading