Skip to content

Commit 7b8ab09

Browse files
committed
Merge remote-tracking branch 'elastic/master' into geosql
2 parents b2f0435 + 697c990 commit 7b8ab09

File tree

93 files changed

+1254
-993
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1254
-993
lines changed

client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.client;
2121

2222
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Supplier;
2324

2425
/**
2526
* Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
@@ -30,35 +31,36 @@ final class DeadHostState implements Comparable<DeadHostState> {
3031

3132
private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
3233
static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
34+
static final Supplier<Long> DEFAULT_TIME_SUPPLIER = System::nanoTime;
3335

3436
private final int failedAttempts;
3537
private final long deadUntilNanos;
36-
private final TimeSupplier timeSupplier;
38+
private final Supplier<Long> timeSupplier;
3739

3840
/**
3941
* Build the initial dead state of a host. Useful when a working host stops functioning
4042
* and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
4143
*
4244
* @param timeSupplier a way to supply the current time and allow for unit testing
4345
*/
44-
DeadHostState(TimeSupplier timeSupplier) {
46+
DeadHostState(Supplier<Long> timeSupplier) {
4547
this.failedAttempts = 1;
46-
this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
48+
this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
4749
this.timeSupplier = timeSupplier;
4850
}
4951

5052
/**
5153
* Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
5254
* it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
5355
* to retry that same host again. Minimum is 1 minute (for a node the only failed once created
54-
* through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
56+
* through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
5557
*
5658
* @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
5759
*/
5860
DeadHostState(DeadHostState previousDeadHostState) {
5961
long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
6062
MAX_CONNECTION_TIMEOUT_NANOS);
61-
this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
63+
this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
6264
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
6365
this.timeSupplier = previousDeadHostState.timeSupplier;
6466
}
@@ -69,7 +71,7 @@ final class DeadHostState implements Comparable<DeadHostState> {
6971
* @return true if the host should be retried, false otherwise
7072
*/
7173
boolean shallBeRetried() {
72-
return timeSupplier.nanoTime() - deadUntilNanos > 0;
74+
return timeSupplier.get() - deadUntilNanos > 0;
7375
}
7476

7577
/**
@@ -87,8 +89,8 @@ int getFailedAttempts() {
8789
@Override
8890
public int compareTo(DeadHostState other) {
8991
if (timeSupplier != other.timeSupplier) {
90-
throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
91-
+ timeSupplier + " != " + other.timeSupplier + "]");
92+
throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " +
93+
"be based on different clocks");
9294
}
9395
return Long.compare(deadUntilNanos, other.deadUntilNanos);
9496
}
@@ -101,23 +103,4 @@ public String toString() {
101103
", timeSupplier=" + timeSupplier +
102104
'}';
103105
}
104-
105-
/**
106-
* Time supplier that makes timing aspects pluggable to ease testing
107-
*/
108-
interface TimeSupplier {
109-
TimeSupplier DEFAULT = new TimeSupplier() {
110-
@Override
111-
public long nanoTime() {
112-
return System.nanoTime();
113-
}
114-
115-
@Override
116-
public String toString() {
117-
return "nanoTime";
118-
}
119-
};
120-
121-
long nanoTime();
122-
}
123106
}

client/rest/src/main/java/org/elasticsearch/client/Request.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ public String getEndpoint() {
7474
*/
7575
public void addParameter(String name, String value) {
7676
Objects.requireNonNull(name, "url parameter name cannot be null");
77-
// .putIfAbsent(name, value) except we are in Java 7 which doesn't have that.
7877
if (parameters.containsKey(name)) {
7978
throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]");
8079
} else {

client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.elasticsearch.client;
2121

22-
import org.apache.http.message.BasicHeader;
2322
import org.apache.http.Header;
23+
import org.apache.http.message.BasicHeader;
2424
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
2525
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
2626

@@ -38,7 +38,7 @@ public final class RequestOptions {
3838
* Default request options.
3939
*/
4040
public static final RequestOptions DEFAULT = new Builder(
41-
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
41+
Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
4242

4343
private final List<Header> headers;
4444
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;

client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.http.nio.client.methods.HttpAsyncMethods;
4747
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
4848
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
49-
import org.elasticsearch.client.DeadHostState.TimeSupplier;
5049

5150
import javax.net.ssl.SSLHandshakeException;
5251
import java.io.Closeable;
@@ -72,6 +71,7 @@
7271
import java.util.concurrent.ConcurrentMap;
7372
import java.util.concurrent.ExecutionException;
7473
import java.util.concurrent.atomic.AtomicInteger;
74+
import java.util.stream.Collectors;
7575

7676
import static java.util.Collections.singletonList;
7777

@@ -139,7 +139,11 @@ public static RestClientBuilder builder(Node... nodes) {
139139
* @see Node#Node(HttpHost)
140140
*/
141141
public static RestClientBuilder builder(HttpHost... hosts) {
142-
return new RestClientBuilder(hostsToNodes(hosts));
142+
if (hosts == null || hosts.length == 0) {
143+
throw new IllegalArgumentException("hosts must not be null nor empty");
144+
}
145+
List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
146+
return new RestClientBuilder(nodes);
143147
}
144148

145149
/**
@@ -163,17 +167,6 @@ public synchronized void setNodes(Collection<Node> nodes) {
163167
this.blacklist.clear();
164168
}
165169

166-
private static List<Node> hostsToNodes(HttpHost[] hosts) {
167-
if (hosts == null || hosts.length == 0) {
168-
throw new IllegalArgumentException("hosts must not be null nor empty");
169-
}
170-
List<Node> nodes = new ArrayList<>(hosts.length);
171-
for (HttpHost host : hosts) {
172-
nodes.add(new Node(host));
173-
}
174-
return nodes;
175-
}
176-
177170
/**
178171
* Get the list of nodes that the client knows about. The list is
179172
* unmodifiable.
@@ -369,15 +362,11 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
369362
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
370363
for (Node node : nodeTuple.nodes) {
371364
DeadHostState deadness = blacklist.get(node.getHost());
372-
if (deadness == null) {
373-
livingNodes.add(node);
374-
continue;
375-
}
376-
if (deadness.shallBeRetried()) {
365+
if (deadness == null || deadness.shallBeRetried()) {
377366
livingNodes.add(node);
378-
continue;
367+
} else {
368+
deadNodes.add(new DeadNode(node, deadness));
379369
}
380-
deadNodes.add(new DeadNode(node, deadness));
381370
}
382371

383372
if (false == livingNodes.isEmpty()) {
@@ -415,12 +404,7 @@ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost,
415404
* to compare many things. This saves us a sort on the unfiltered
416405
* list.
417406
*/
418-
nodeSelector.select(new Iterable<Node>() {
419-
@Override
420-
public Iterator<Node> iterator() {
421-
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
422-
}
423-
});
407+
nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
424408
if (false == selectedDeadNodes.isEmpty()) {
425409
return singletonList(Collections.min(selectedDeadNodes).node);
426410
}
@@ -447,7 +431,7 @@ private void onResponse(Node node) {
447431
private void onFailure(Node node) {
448432
while(true) {
449433
DeadHostState previousDeadHostState =
450-
blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
434+
blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
451435
if (previousDeadHostState == null) {
452436
if (logger.isDebugEnabled()) {
453437
logger.debug("added [" + node + "] to blacklist");

client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,8 @@ public RestClient build() {
186186
if (failureListener == null) {
187187
failureListener = new RestClient.FailureListener();
188188
}
189-
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
190-
@Override
191-
public CloseableHttpAsyncClient run() {
192-
return createHttpClient();
193-
}
194-
});
189+
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
190+
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
195191
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
196192
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
197193
httpClient.start();
@@ -218,12 +214,7 @@ private CloseableHttpAsyncClient createHttpClient() {
218214
}
219215

220216
final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
221-
return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
222-
@Override
223-
public CloseableHttpAsyncClient run() {
224-
return finalBuilder.build();
225-
}
226-
});
217+
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
227218
} catch (NoSuchAlgorithmException e) {
228219
throw new IllegalStateException("could not create the default ssl context", e);
229220
}

client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java

Lines changed: 16 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.concurrent.TimeUnit;
2323
import java.util.concurrent.atomic.AtomicLong;
2424

25-
import org.elasticsearch.client.DeadHostState.TimeSupplier;
26-
2725
import static org.hamcrest.MatcherAssert.assertThat;
2826
import static org.hamcrest.Matchers.equalTo;
2927
import static org.hamcrest.Matchers.greaterThan;
@@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase {
3836
private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};
3937

4038
public void testInitialDeadHostStateDefaultTimeSupplier() {
41-
DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
39+
DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
4240
long currentTime = System.nanoTime();
4341
assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime));
4442
assertThat(deadHostState.getFailedAttempts(), equalTo(1));
4543
}
4644

4745
public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
48-
DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
46+
DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
4947
int iters = randomIntBetween(5, 30);
5048
for (int i = 0; i < iters; i++) {
5149
DeadHostState deadHostState = new DeadHostState(previous);
@@ -58,10 +56,13 @@ public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
5856
public void testCompareToTimeSupplier() {
5957
int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
6058
DeadHostState[] deadHostStates = new DeadHostState[numObjects];
59+
final AtomicLong time = new AtomicLong(0);
6160
for (int i = 0; i < numObjects; i++) {
6261
if (i == 0) {
63-
// this test requires a strictly increasing timer
64-
deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier());
62+
// this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight
63+
// loop we always notice time moving forward. This does not happen for real timer implementations
64+
// (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
65+
deadHostStates[i] = new DeadHostState(time::incrementAndGet);
6566
} else {
6667
deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]);
6768
}
@@ -74,42 +75,39 @@ public void testCompareToTimeSupplier() {
7475

7576
public void testCompareToDifferingTimeSupplier() {
7677
try {
77-
new DeadHostState(TimeSupplier.DEFAULT).compareTo(
78-
new DeadHostState(new ConfigurableTimeSupplier()));
78+
new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo(
79+
new DeadHostState(() -> 0L));
7980
fail("expected failure");
8081
} catch (IllegalArgumentException e) {
81-
assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]",
82-
e.getMessage());
82+
assertEquals("can't compare DeadHostStates holding different time suppliers as they may " +
83+
"be based on different clocks", e.getMessage());
8384
}
8485
}
8586

8687
public void testShallBeRetried() {
87-
ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
88+
final AtomicLong time = new AtomicLong(0);
8889
DeadHostState deadHostState = null;
8990
for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
9091
long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
91-
timeSupplier.nanoTime = 0;
9292
if (i == 0) {
93-
deadHostState = new DeadHostState(timeSupplier);
93+
deadHostState = new DeadHostState(time::get);
9494
} else {
9595
deadHostState = new DeadHostState(deadHostState);
9696
}
9797
for (int j = 0; j < expectedTimeoutSecond; j++) {
98-
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
98+
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
9999
assertThat(deadHostState.shallBeRetried(), is(false));
100100
}
101101
int iters = randomIntBetween(5, 30);
102102
for (int j = 0; j < iters; j++) {
103-
timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
103+
time.addAndGet(TimeUnit.SECONDS.toNanos(1));
104104
assertThat(deadHostState.shallBeRetried(), is(true));
105105
}
106106
}
107107
}
108108

109109
public void testDeadHostStateTimeouts() {
110-
ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
111-
zeroTimeSupplier.nanoTime = 0L;
112-
DeadHostState previous = new DeadHostState(zeroTimeSupplier);
110+
DeadHostState previous = new DeadHostState(() -> 0L);
113111
for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
114112
assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
115113
previous = new DeadHostState(previous);
@@ -123,37 +121,4 @@ public void testDeadHostStateTimeouts() {
123121
previous = deadHostState;
124122
}
125123
}
126-
127-
static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
128-
long nanoTime;
129-
130-
@Override
131-
public long nanoTime() {
132-
return nanoTime;
133-
}
134-
135-
@Override
136-
public String toString() {
137-
return "configured[" + nanoTime + "]";
138-
}
139-
}
140-
141-
/**
142-
* Simulates a monotonically strict increasing time (i.e. the value increases on every call to <code>#nanoTime()</code>). This ensures
143-
* that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real
144-
* timer implementations (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
145-
*/
146-
static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier {
147-
private final AtomicLong time = new AtomicLong(0);
148-
149-
@Override
150-
public long nanoTime() {
151-
return time.incrementAndGet();
152-
}
153-
154-
@Override
155-
public String toString() {
156-
return "strict monotonic[" + time.get() + "]";
157-
}
158-
}
159124
}

0 commit comments

Comments
 (0)