25
25
import org .apache .http .HttpHost ;
26
26
import org .apache .http .HttpRequest ;
27
27
import org .apache .http .HttpResponse ;
28
+ import org .apache .http .client .AuthCache ;
28
29
import org .apache .http .client .ClientProtocolException ;
29
30
import org .apache .http .client .methods .HttpEntityEnclosingRequestBase ;
30
31
import org .apache .http .client .methods .HttpHead ;
34
35
import org .apache .http .client .methods .HttpPut ;
35
36
import org .apache .http .client .methods .HttpRequestBase ;
36
37
import org .apache .http .client .methods .HttpTrace ;
38
+ import org .apache .http .client .protocol .HttpClientContext ;
37
39
import org .apache .http .client .utils .URIBuilder ;
38
40
import org .apache .http .concurrent .FutureCallback ;
41
+ import org .apache .http .impl .auth .BasicScheme ;
42
+ import org .apache .http .impl .client .BasicAuthCache ;
39
43
import org .apache .http .impl .nio .client .CloseableHttpAsyncClient ;
40
44
import org .apache .http .nio .client .methods .HttpAsyncMethods ;
41
45
import org .apache .http .nio .protocol .HttpAsyncRequestProducer ;
@@ -92,7 +96,7 @@ public class RestClient implements Closeable {
92
96
private final long maxRetryTimeoutMillis ;
93
97
private final String pathPrefix ;
94
98
private final AtomicInteger lastHostIndex = new AtomicInteger (0 );
95
- private volatile Set <HttpHost > hosts ;
99
+ private volatile HostTuple < Set <HttpHost >> hostTuple ;
96
100
private final ConcurrentMap <HttpHost , DeadHostState > blacklist = new ConcurrentHashMap <>();
97
101
private final FailureListener failureListener ;
98
102
@@ -122,11 +126,13 @@ public synchronized void setHosts(HttpHost... hosts) {
122
126
throw new IllegalArgumentException ("hosts must not be null nor empty" );
123
127
}
124
128
Set <HttpHost > httpHosts = new HashSet <>();
129
+ AuthCache authCache = new BasicAuthCache ();
125
130
for (HttpHost host : hosts ) {
126
131
Objects .requireNonNull (host , "host cannot be null" );
127
132
httpHosts .add (host );
133
+ authCache .put (host , new BasicScheme ());
128
134
}
129
- this .hosts = Collections .unmodifiableSet (httpHosts );
135
+ this .hostTuple = new HostTuple <>( Collections .unmodifiableSet (httpHosts ), authCache );
130
136
this .blacklist .clear ();
131
137
}
132
138
@@ -315,19 +321,22 @@ public void performRequestAsync(String method, String endpoint, Map<String, Stri
315
321
setHeaders (request , headers );
316
322
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener (responseListener );
317
323
long startTime = System .nanoTime ();
318
- performRequestAsync (startTime , nextHost (). iterator () , request , ignoreErrorCodes , httpAsyncResponseConsumerFactory ,
319
- failureTrackingResponseListener );
324
+ performRequestAsync (startTime , nextHost (), request , ignoreErrorCodes , httpAsyncResponseConsumerFactory ,
325
+ failureTrackingResponseListener );
320
326
}
321
327
322
- private void performRequestAsync (final long startTime , final Iterator <HttpHost > hosts , final HttpRequestBase request ,
328
+ private void performRequestAsync (final long startTime , final HostTuple < Iterator <HttpHost >> hostTuple , final HttpRequestBase request ,
323
329
final Set <Integer > ignoreErrorCodes ,
324
330
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory ,
325
331
final FailureTrackingResponseListener listener ) {
326
- final HttpHost host = hosts .next ();
332
+ final HttpHost host = hostTuple . hosts .next ();
327
333
//we stream the request body if the entity allows for it
328
- HttpAsyncRequestProducer requestProducer = HttpAsyncMethods .create (host , request );
329
- HttpAsyncResponseConsumer <HttpResponse > asyncResponseConsumer = httpAsyncResponseConsumerFactory .createHttpAsyncResponseConsumer ();
330
- client .execute (requestProducer , asyncResponseConsumer , new FutureCallback <HttpResponse >() {
334
+ final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods .create (host , request );
335
+ final HttpAsyncResponseConsumer <HttpResponse > asyncResponseConsumer =
336
+ httpAsyncResponseConsumerFactory .createHttpAsyncResponseConsumer ();
337
+ final HttpClientContext context = HttpClientContext .create ();
338
+ context .setAuthCache (hostTuple .authCache );
339
+ client .execute (requestProducer , asyncResponseConsumer , context , new FutureCallback <HttpResponse >() {
331
340
@ Override
332
341
public void completed (HttpResponse httpResponse ) {
333
342
try {
@@ -366,7 +375,7 @@ public void failed(Exception failure) {
366
375
}
367
376
368
377
private void retryIfPossible (Exception exception ) {
369
- if (hosts .hasNext ()) {
378
+ if (hostTuple . hosts .hasNext ()) {
370
379
//in case we are retrying, check whether maxRetryTimeout has been reached
371
380
long timeElapsedMillis = TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - startTime );
372
381
long timeout = maxRetryTimeoutMillis - timeElapsedMillis ;
@@ -377,7 +386,7 @@ private void retryIfPossible(Exception exception) {
377
386
} else {
378
387
listener .trackFailure (exception );
379
388
request .reset ();
380
- performRequestAsync (startTime , hosts , request , ignoreErrorCodes , httpAsyncResponseConsumerFactory , listener );
389
+ performRequestAsync (startTime , hostTuple , request , ignoreErrorCodes , httpAsyncResponseConsumerFactory , listener );
381
390
}
382
391
} else {
383
392
listener .onDefinitiveFailure (exception );
@@ -415,17 +424,18 @@ private void setHeaders(HttpRequest httpRequest, Header[] requestHeaders) {
415
424
* The iterator returned will never be empty. In case there are no healthy hosts available, or dead ones to be be retried,
416
425
* one dead host gets returned so that it can be retried.
417
426
*/
418
- private Iterable <HttpHost > nextHost () {
427
+ private HostTuple <Iterator <HttpHost >> nextHost () {
428
+ final HostTuple <Set <HttpHost >> hostTuple = this .hostTuple ;
419
429
Collection <HttpHost > nextHosts = Collections .emptySet ();
420
430
do {
421
- Set <HttpHost > filteredHosts = new HashSet <>(hosts );
431
+ Set <HttpHost > filteredHosts = new HashSet <>(hostTuple . hosts );
422
432
for (Map .Entry <HttpHost , DeadHostState > entry : blacklist .entrySet ()) {
423
433
if (System .nanoTime () - entry .getValue ().getDeadUntilNanos () < 0 ) {
424
434
filteredHosts .remove (entry .getKey ());
425
435
}
426
436
}
427
437
if (filteredHosts .isEmpty ()) {
428
- //last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
438
+ //last resort: if there are no good host to use, return a single dead one, the one that's closest to being retried
429
439
List <Map .Entry <HttpHost , DeadHostState >> sortedHosts = new ArrayList <>(blacklist .entrySet ());
430
440
if (sortedHosts .size () > 0 ) {
431
441
Collections .sort (sortedHosts , new Comparator <Map .Entry <HttpHost , DeadHostState >>() {
@@ -444,7 +454,7 @@ public int compare(Map.Entry<HttpHost, DeadHostState> o1, Map.Entry<HttpHost, De
444
454
nextHosts = rotatedHosts ;
445
455
}
446
456
} while (nextHosts .isEmpty ());
447
- return nextHosts ;
457
+ return new HostTuple <>( nextHosts . iterator (), hostTuple . authCache ) ;
448
458
}
449
459
450
460
/**
@@ -686,4 +696,18 @@ public void onFailure(HttpHost host) {
686
696
687
697
}
688
698
}
699
+
700
+ /**
701
+ * {@code HostTuple} enables the {@linkplain HttpHost}s and {@linkplain AuthCache} to be set together in a thread
702
+ * safe, volatile way.
703
+ */
704
+ private static class HostTuple <T > {
705
+ public final T hosts ;
706
+ public final AuthCache authCache ;
707
+
708
+ public HostTuple (final T hosts , final AuthCache authCache ) {
709
+ this .hosts = hosts ;
710
+ this .authCache = authCache ;
711
+ }
712
+ }
689
713
}
0 commit comments