|
| 1 | +/* |
| 2 | + * Licensed to Elasticsearch under one or more contributor |
| 3 | + * license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright |
| 5 | + * ownership. Elasticsearch licenses this file to you under |
| 6 | + * the Apache License, Version 2.0 (the "License"); you may |
| 7 | + * not use this file except in compliance with the License. |
| 8 | + * You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +package org.elasticsearch.search; |
| 21 | + |
| 22 | + |
| 23 | +import org.elasticsearch.action.search.SearchResponse; |
| 24 | +import org.elasticsearch.common.settings.Settings; |
| 25 | +import org.elasticsearch.common.unit.TimeValue; |
| 26 | +import org.elasticsearch.index.query.RangeQueryBuilder; |
| 27 | +import org.elasticsearch.test.ESIntegTestCase; |
| 28 | + |
| 29 | +import java.util.ArrayList; |
| 30 | +import java.util.List; |
| 31 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 32 | +import java.util.function.Supplier; |
| 33 | + |
| 34 | +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; |
| 35 | +import static org.hamcrest.Matchers.equalTo; |
| 36 | + |
| 37 | + |
| 38 | +/** |
| 39 | + * This test is a disruption style test that restarts data nodes to see if search behaves well under extreme conditions. |
| 40 | + */ |
| 41 | +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2) |
| 42 | +public class SearchWithFailingNodesIT extends ESIntegTestCase { |
| 43 | + |
| 44 | + public void testDisallowPartialsWithRedStateRecovering() throws Exception { |
| 45 | + int docCount = scaledRandomIntBetween(10, 1000); |
| 46 | + logger.info("Using docCount [{}]", docCount); |
| 47 | + assertAcked(prepareCreate("test").setSettings(Settings.builder() |
| 48 | + .put("index.number_of_shards", cluster().numDataNodes() + 2).put("index.number_of_replicas", 1))); |
| 49 | + ensureGreen(); |
| 50 | + for (int i = 0; i < docCount; i++) { |
| 51 | + client().prepareIndex("test", "_doc", ""+i).setSource("field1", i).get(); |
| 52 | + } |
| 53 | + refresh("test"); |
| 54 | + |
| 55 | + AtomicBoolean stop = new AtomicBoolean(); |
| 56 | + List<Thread> searchThreads = new ArrayList<>(); |
| 57 | + // this is a little extreme, but necessary to provoke spectacular timings like hitting a recovery on a replica |
| 58 | + for (int i = 0; i < 100; ++i) { |
| 59 | + Thread searchThread = new Thread() { |
| 60 | + { |
| 61 | + setDaemon(true); |
| 62 | + } |
| 63 | + |
| 64 | + @Override |
| 65 | + public void run() { |
| 66 | + while (stop.get() == false) { |
| 67 | + // todo: the timeouts below should not be necessary, but this test sometimes hangs without, to be fixed (or |
| 68 | + // explained) |
| 69 | + verify(() -> client().prepareSearch("test").setQuery(new RangeQueryBuilder("field1").gte(0)) |
| 70 | + .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); |
| 71 | + verify(() -> client().prepareSearch("test") |
| 72 | + .setSize(100).setAllowPartialSearchResults(false).get(TimeValue.timeValueSeconds(10))); |
| 73 | + } |
| 74 | + } |
| 75 | + |
| 76 | + void verify(Supplier<SearchResponse> call) { |
| 77 | + try { |
| 78 | + SearchResponse response = call.get(); |
| 79 | + assertThat(response.getHits().getHits().length, equalTo(Math.min(100, docCount))); |
| 80 | + assertThat(response.getHits().getTotalHits().value, equalTo((long) docCount)); |
| 81 | + } catch (Exception e) { |
| 82 | + // this is OK. |
| 83 | + logger.info("Failed with : " + e); |
| 84 | + } |
| 85 | + } |
| 86 | + }; |
| 87 | + searchThreads.add(searchThread); |
| 88 | + searchThread.start(); |
| 89 | + } |
| 90 | + try { |
| 91 | + // have two threads do restarts, with a replica of 1, this means we will sometimes have no copies (RED) |
| 92 | + Thread restartThread = new Thread() { |
| 93 | + { |
| 94 | + setDaemon(true); |
| 95 | + } |
| 96 | + |
| 97 | + @Override |
| 98 | + public void run() { |
| 99 | + try { |
| 100 | + for (int i = 0; i < 5; ++i) { |
| 101 | + internalCluster().restartRandomDataNode(); |
| 102 | + } |
| 103 | + } catch (Exception e) { |
| 104 | + throw new RuntimeException(e); |
| 105 | + } |
| 106 | + } |
| 107 | + }; |
| 108 | + restartThread.start(); |
| 109 | + for (int i = 0; i < 5; ++i) { |
| 110 | + internalCluster().restartRandomDataNode(); |
| 111 | + } |
| 112 | + restartThread.join(30000); |
| 113 | + assertFalse(restartThread.isAlive()); |
| 114 | + } finally { |
| 115 | + stop.set(true); |
| 116 | + searchThreads.forEach(thread -> { |
| 117 | + try { |
| 118 | + thread.join(30000); |
| 119 | + if (thread.isAlive()) { |
| 120 | + logger.warn("Thread: " + thread + " is still alive"); |
| 121 | + // do not continue unless thread terminates to avoid getting other confusing test errors. Please kill me... |
| 122 | + thread.join(); |
| 123 | + } |
| 124 | + } catch (InterruptedException e) { |
| 125 | + throw new RuntimeException(e); |
| 126 | + } |
| 127 | + }); |
| 128 | + } |
| 129 | + |
| 130 | + // hack to ensure all search contexts are removed, seems we risk leaked search contexts when coordinator dies. |
| 131 | + client().admin().indices().prepareDelete("test").get(); |
| 132 | + } |
| 133 | + |
| 134 | + |
| 135 | +} |
0 commit comments