|
18 | 18 | */
|
19 | 19 | package org.elasticsearch.action.search;
|
20 | 20 |
|
| 21 | +import org.apache.lucene.util.BytesRef; |
21 | 22 | import org.elasticsearch.Version;
|
22 | 23 | import org.elasticsearch.action.ActionListener;
|
23 | 24 | import org.elasticsearch.action.OriginalIndices;
|
|
54 | 55 | import java.util.concurrent.atomic.AtomicReference;
|
55 | 56 | import java.util.stream.IntStream;
|
56 | 57 |
|
| 58 | +import static org.hamcrest.Matchers.equalTo; |
| 59 | + |
57 | 60 | public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
58 | 61 |
|
59 | 62 | public void testFilterShards() throws InterruptedException {
|
@@ -350,4 +353,74 @@ public void run() {
|
350 | 353 | }
|
351 | 354 | }
|
352 | 355 | }
|
| 356 | + |
| 357 | + public void testSortShardsDisabled() throws InterruptedException { |
| 358 | + final TransportSearchAction.SearchTimeProvider timeProvider = |
| 359 | + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); |
| 360 | + |
| 361 | + Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>(); |
| 362 | + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); |
| 363 | + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); |
| 364 | + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); |
| 365 | + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); |
| 366 | + |
| 367 | + for (SortOrder order : SortOrder.values()) { |
| 368 | + int numShards = randomIntBetween(2, 20); |
| 369 | + List<ShardId> shardIds = new ArrayList<>(); |
| 370 | + Set<ShardId> shardToSkip = new HashSet<>(); |
| 371 | + |
| 372 | + SearchTransportService searchTransportService = new SearchTransportService(null, null) { |
| 373 | + @Override |
| 374 | + public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task, |
| 375 | + ActionListener<SearchService.CanMatchResponse> listener) { |
| 376 | + final MinAndMax<?> minMax; |
| 377 | + if (request.shardId().id() == numShards-1) { |
| 378 | + minMax = new MinAndMax<>(new BytesRef("bar"), new BytesRef("baz")); |
| 379 | + } else { |
| 380 | + Long min = rarely() ? null : randomLong(); |
| 381 | + Long max = min == null ? null : randomLongBetween(min, Long.MAX_VALUE); |
| 382 | + minMax = min == null ? null : new MinAndMax<>(min, max); |
| 383 | + } |
| 384 | + boolean canMatch = frequently(); |
| 385 | + synchronized (shardIds) { |
| 386 | + shardIds.add(request.shardId()); |
| 387 | + if (canMatch == false) { |
| 388 | + shardToSkip.add(request.shardId()); |
| 389 | + } |
| 390 | + } |
| 391 | + new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(canMatch, minMax))).start(); |
| 392 | + } |
| 393 | + }; |
| 394 | + |
| 395 | + AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>(); |
| 396 | + CountDownLatch latch = new CountDownLatch(1); |
| 397 | + GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("logs", |
| 398 | + new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS), |
| 399 | + numShards, randomBoolean(), primaryNode, replicaNode); |
| 400 | + final SearchRequest searchRequest = new SearchRequest(); |
| 401 | + searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order))); |
| 402 | + searchRequest.allowPartialSearchResults(true); |
| 403 | + |
| 404 | + CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger, |
| 405 | + searchTransportService, |
| 406 | + (clusterAlias, node) -> lookup.get(node), |
| 407 | + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), |
| 408 | + Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), |
| 409 | + searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, |
| 410 | + (iter) -> new SearchPhase("test") { |
| 411 | + @Override |
| 412 | + public void run() { |
| 413 | + result.set(iter); |
| 414 | + latch.countDown(); |
| 415 | + } |
| 416 | + }, SearchResponse.Clusters.EMPTY); |
| 417 | + |
| 418 | + canMatchPhase.start(); |
| 419 | + latch.await(); |
| 420 | + for (SearchShardIterator i : result.get()) { |
| 421 | + assertEquals(shardToSkip.contains(i.shardId()), i.skip()); |
| 422 | + } |
| 423 | + assertThat(result.get().size(), equalTo(numShards)); |
| 424 | + } |
| 425 | + } |
353 | 426 | }
|
0 commit comments