53
53
import java .util .concurrent .CountDownLatch ;
54
54
import java .util .concurrent .ExecutorService ;
55
55
import java .util .concurrent .Executors ;
56
+ import java .util .concurrent .atomic .AtomicBoolean ;
56
57
import java .util .concurrent .atomic .AtomicInteger ;
57
58
import java .util .concurrent .atomic .AtomicReference ;
58
59
60
+ import static org .elasticsearch .common .util .concurrent .ConcurrentCollections .newConcurrentMap ;
61
+ import static org .elasticsearch .common .util .concurrent .ConcurrentCollections .newConcurrentSet ;
62
+
59
63
public class SearchAsyncActionTests extends ESTestCase {
60
64
61
65
public void testSkipSearchShards () throws InterruptedException {
@@ -137,7 +141,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
137
141
protected SearchPhase getNextPhase (SearchPhaseResults <TestSearchPhaseResult > results , SearchPhaseContext context ) {
138
142
return new SearchPhase ("test" ) {
139
143
@ Override
140
- public void run () throws IOException {
144
+ public void run () {
141
145
latch .countDown ();
142
146
}
143
147
};
@@ -253,7 +257,6 @@ public void run() throws IOException {
253
257
public void testFanOutAndCollect () throws InterruptedException {
254
258
SearchRequest request = new SearchRequest ();
255
259
request .setMaxConcurrentShardRequests (randomIntBetween (1 , 100 ));
256
- CountDownLatch latch = new CountDownLatch (1 );
257
260
AtomicReference <TestSearchResponse > response = new AtomicReference <>();
258
261
ActionListener <SearchResponse > responseListener = new ActionListener <SearchResponse >() {
259
262
@ Override
@@ -270,7 +273,7 @@ public void onFailure(Exception e) {
270
273
DiscoveryNode primaryNode = new DiscoveryNode ("node_1" , new LocalTransportAddress ("foo" ), Version .CURRENT );
271
274
DiscoveryNode replicaNode = new DiscoveryNode ("node_2" , new LocalTransportAddress ("bar" ), Version .CURRENT );
272
275
273
- Map <DiscoveryNode , Set <Long >> nodeToContextMap = new HashMap <> ();
276
+ Map <DiscoveryNode , Set <Long >> nodeToContextMap = newConcurrentMap ();
274
277
AtomicInteger contextIdGenerator = new AtomicInteger (0 );
275
278
GroupShardsIterator <SearchShardIterator > shardsIter = getShardsIter ("idx" ,
276
279
new OriginalIndices (new String []{"idx" }, IndicesOptions .strictExpandOpenAndForbidClosed ()),
@@ -289,7 +292,9 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Ori
289
292
lookup .put (replicaNode .getId (), new MockConnection (replicaNode ));
290
293
Map <String , AliasFilter > aliasFilters = Collections .singletonMap ("_na_" , new AliasFilter (null , Strings .EMPTY_ARRAY ));
291
294
final ExecutorService executor = Executors .newFixedThreadPool (randomIntBetween (1 , Runtime .getRuntime ().availableProcessors ()));
292
- AbstractSearchAsyncAction asyncAction =
295
+ final CountDownLatch latch = new CountDownLatch (1 );
296
+ final AtomicBoolean latchTriggered = new AtomicBoolean ();
297
+ AbstractSearchAsyncAction <TestSearchPhaseResult > asyncAction =
293
298
new AbstractSearchAsyncAction <TestSearchPhaseResult >(
294
299
"test" ,
295
300
logger ,
@@ -317,7 +322,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
317
322
Transport .Connection connection = getConnection (null , shard .currentNodeId ());
318
323
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult (contextIdGenerator .incrementAndGet (),
319
324
connection .getNode ());
320
- Set <Long > ids = nodeToContextMap .computeIfAbsent (connection .getNode (), (n ) -> new HashSet <> ());
325
+ Set <Long > ids = nodeToContextMap .computeIfAbsent (connection .getNode (), (n ) -> newConcurrentSet ());
321
326
ids .add (testSearchPhaseResult .getRequestId ());
322
327
if (randomBoolean ()) {
323
328
listener .onResponse (testSearchPhaseResult );
@@ -330,13 +335,16 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
330
335
protected SearchPhase getNextPhase (SearchPhaseResults <TestSearchPhaseResult > results , SearchPhaseContext context ) {
331
336
return new SearchPhase ("test" ) {
332
337
@ Override
333
- public void run () throws IOException {
338
+ public void run () {
334
339
for (int i = 0 ; i < results .getNumShards (); i ++) {
335
340
TestSearchPhaseResult result = results .getAtomicArray ().get (i );
336
341
assertEquals (result .node .getId (), result .getSearchShardTarget ().getNodeId ());
337
342
sendReleaseSearchContext (result .getRequestId (), new MockConnection (result .node ), OriginalIndices .NONE );
338
343
}
339
344
responseListener .onResponse (response );
345
+ if (latchTriggered .compareAndSet (false , true ) == false ) {
346
+ throw new AssertionError ("latch triggered twice" );
347
+ }
340
348
latch .countDown ();
341
349
}
342
350
};
0 commit comments