Skip to content

Commit 6740948

Browse files
committedNov 30, 2023
fixed failover with round robin load balancing (DE-724)
1 parent 8228a70 commit 6740948

File tree

9 files changed

+104
-12
lines changed

9 files changed

+104
-12
lines changed
 

‎.github/workflows/resilience.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
runs-on: ubuntu-latest
2424

2525
env:
26-
TOXIPROXY_VERSION: v2.5.0
26+
TOXIPROXY_VERSION: v2.7.0
2727

2828
strategy:
2929
fail-fast: false

‎core/src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java

+6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public Host get(final HostHandle hostHandle, final AccessType accessType) {
5252
return determineHostHandler().get(hostHandle, accessType);
5353
}
5454

55+
@Override
56+
public boolean hasNext(HostHandle hostHandle, AccessType accessType) {
57+
this.currentAccessType = accessType;
58+
return determineHostHandler().hasNext(hostHandle, accessType);
59+
}
60+
5561
@Override
5662
public void success() {
5763
determineHostHandler().success();

‎core/src/main/java/com/arangodb/internal/net/FallbackHostHandler.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public FallbackHostHandler(final HostResolver resolver) {
4949

5050
@Override
5151
public Host get(final HostHandle hostHandle, AccessType accessType) {
52-
if (current != lastSuccess || iterations < 3) {
52+
if (hasNext(hostHandle, accessType)) {
5353
return current;
5454
} else {
5555
ArangoDBException e = ArangoDBException.of("Cannot contact any host!",
@@ -59,6 +59,11 @@ public Host get(final HostHandle hostHandle, AccessType accessType) {
5959
}
6060
}
6161

62+
@Override
63+
public boolean hasNext(HostHandle hostHandle, AccessType accessType) {
64+
return current != lastSuccess || iterations < 3;
65+
}
66+
6267
@Override
6368
public void success() {
6469
lastSuccess = current;

‎core/src/main/java/com/arangodb/internal/net/HostHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public interface HostHandler {
3131

3232
Host get(HostHandle hostHandle, AccessType accessType);
3333

34+
boolean hasNext(HostHandle hostHandle, AccessType accessType);
35+
3436
void success();
3537

3638
void fail(Exception exception);

‎core/src/main/java/com/arangodb/internal/net/RandomHostHandler.java

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public Host get(final HostHandle hostHandle, AccessType accessType) {
5252
return current;
5353
}
5454

55+
@Override
56+
public boolean hasNext(HostHandle hostHandle, AccessType accessType) {
57+
return true;
58+
}
59+
5560
@Override
5661
public void success() {
5762
fallback.success();

‎core/src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java

+12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.arangodb.ArangoDBException;
2424
import com.arangodb.ArangoDBMultipleException;
2525
import com.arangodb.config.HostDescription;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
import java.util.ArrayList;
2830
import java.util.List;
@@ -32,6 +34,8 @@
3234
*/
3335
public class RoundRobinHostHandler implements HostHandler {
3436

37+
private final static Logger LOGGER = LoggerFactory.getLogger(RoundRobinHostHandler.class);
38+
3539
private final HostResolver resolver;
3640
private final List<Exception> lastFailExceptions;
3741
private long current;
@@ -74,9 +78,17 @@ public Host get(final HostHandle hostHandle, AccessType accessType) {
7478
hostHandle.setHost(host.getDescription());
7579
}
7680
}
81+
LOGGER.debug("Returning host: {}", host);
7782
return host;
7883
}
7984

85+
@Override
86+
public boolean hasNext(HostHandle hostHandle, AccessType accessType) {
87+
hosts = resolver.getHosts();
88+
int size = hosts.getHostsList().size();
89+
return fails <= size;
90+
}
91+
8092
@Override
8193
public void success() {
8294
reset();

‎http/src/main/java/com/arangodb/http/HttpCommunication.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private CompletableFuture<InternalResponse> executeAsync(final InternalRequest r
121121
}
122122
}
123123
} catch (Exception ex) {
124-
rfuture.completeExceptionally(ArangoDBException.of(ex));
124+
rfuture.completeExceptionally(ArangoDBException.of(ex, reqId));
125125
}
126126
});
127127
return rfuture;
@@ -134,8 +134,9 @@ private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle,
134134
if (hostHandle != null && hostHandle.getHost() != null) {
135135
hostHandle.setHost(null);
136136
}
137-
Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request));
138-
if (nextHost != null && isSafe) {
137+
boolean hasNextHost = hostHandler.hasNext(hostHandle, RequestUtils.determineAccessType(request));
138+
if (hasNextHost && isSafe) {
139+
Host nextHost = hostHandler.get(hostHandle, RequestUtils.determineAccessType(request));
139140
LOGGER.warn("Could not connect to {} while executing request [id={}]",
140141
host.getDescription(), reqId, ioEx);
141142
LOGGER.debug("Try connecting to {}", nextHost.getDescription());
@@ -145,7 +146,6 @@ private void handleException(boolean isSafe, Throwable e, HostHandle hostHandle,
145146
);
146147
} else {
147148
ArangoDBException aEx = ArangoDBException.of(ioEx, reqId);
148-
LOGGER.error(aEx.getMessage(), aEx);
149149
rfuture.completeExceptionally(aEx);
150150
}
151151
}

‎resilience-tests/src/test/java/resilience/ClusterTest.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import com.arangodb.ArangoDBAsync;
55
import com.arangodb.Request;
66
import com.fasterxml.jackson.databind.node.ObjectNode;
7-
import resilience.utils.MemoryAppender;
87
import eu.rekawek.toxiproxy.Proxy;
98
import eu.rekawek.toxiproxy.ToxiproxyClient;
109
import org.junit.jupiter.api.AfterAll;
1110
import org.junit.jupiter.api.BeforeAll;
1211
import org.junit.jupiter.api.BeforeEach;
1312
import org.junit.jupiter.api.Tag;
13+
import resilience.utils.MemoryAppender;
1414

1515
import java.io.IOException;
1616
import java.util.Arrays;
@@ -117,7 +117,14 @@ protected static String serverIdPOST(ArangoDBAsync adb) {
117117
.get("serverInfo")
118118
.get("serverId")
119119
.textValue();
120-
} catch (InterruptedException | ExecutionException e) {
120+
} catch (ExecutionException e) {
121+
Throwable cause = e.getCause();
122+
if (cause instanceof RuntimeException) {
123+
throw (RuntimeException) cause;
124+
} else {
125+
throw new RuntimeException(e);
126+
}
127+
} catch (InterruptedException e) {
121128
throw new RuntimeException(e);
122129
}
123130
}

‎resilience-tests/src/test/java/resilience/loadbalance/LoadBalanceRoundRobinClusterTest.java

+59-4
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void failoverAsync(ArangoDBAsync arangoDB) throws IOException {
7979
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
8080
}
8181

82-
// FIXME: this only passes for HTTP, while fails for VST and HTTP2
82+
// FIXME: this fails for VST
8383
@Disabled
8484
@ParameterizedTest(name = "{index}")
8585
@MethodSource("arangoProvider")
@@ -105,13 +105,39 @@ void retryGET(ArangoDB arangoDB) throws IOException, InterruptedException {
105105
es.shutdown();
106106
}
107107

108-
// FIXME: this only passes for VST, while fails for HTTP and HTTP2:
109-
// for HTTP and HTTP2 it skips 2 hosts instead of skipping only the failing one
108+
// FIXME: this fails for VST
110109
@Disabled
110+
@ParameterizedTest(name = "{index}")
111+
@MethodSource("asyncArangoProvider")
112+
void retryGETAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException {
113+
List<Endpoint> endpoints = getEndpoints();
114+
115+
// slow down the driver connection
116+
Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000);
117+
Thread.sleep(100);
118+
119+
ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
120+
es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS);
121+
122+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
123+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
124+
125+
toxic.remove();
126+
enableAllEndpoints();
127+
Thread.sleep(100);
128+
129+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
130+
131+
es.shutdown();
132+
}
133+
111134
@ParameterizedTest(name = "{index}")
112135
@MethodSource("arangoProvider")
113136
void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException {
114137
List<Endpoint> endpoints = getEndpoints();
138+
for (Endpoint endpoint : endpoints) {
139+
System.out.println(endpoint.getServerId());
140+
}
115141

116142
// slow down the driver connection
117143
Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000);
@@ -120,7 +146,7 @@ void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException {
120146
ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
121147
es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS);
122148

123-
Throwable thrown = catchThrowable(()->serverIdPOST(arangoDB));
149+
Throwable thrown = catchThrowable(() -> serverIdPOST(arangoDB));
124150
assertThat(thrown).isInstanceOf(ArangoDBException.class);
125151
assertThat(thrown.getCause()).isInstanceOf(IOException.class);
126152

@@ -129,8 +155,37 @@ void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException {
129155

130156
toxic.remove();
131157
enableAllEndpoints();
158+
159+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
160+
161+
es.shutdown();
162+
}
163+
164+
@ParameterizedTest(name = "{index}")
165+
@MethodSource("asyncArangoProvider")
166+
void retryPOSTAsync(ArangoDBAsync arangoDB) throws IOException, InterruptedException {
167+
List<Endpoint> endpoints = getEndpoints();
168+
for (Endpoint endpoint : endpoints) {
169+
System.out.println(endpoint.getServerId());
170+
}
171+
172+
// slow down the driver connection
173+
Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000);
132174
Thread.sleep(100);
133175

176+
ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
177+
es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS);
178+
179+
Throwable thrown = catchThrowable(() -> serverIdPOST(arangoDB));
180+
assertThat(thrown).isInstanceOf(ArangoDBException.class);
181+
assertThat(thrown.getCause()).isInstanceOf(IOException.class);
182+
183+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
184+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
185+
186+
toxic.remove();
187+
enableAllEndpoints();
188+
134189
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
135190

136191
es.shutdown();

0 commit comments

Comments
 (0)
Please sign in to comment.