Skip to content

Commit 3280315

Browse files
committed
Rack-aware weighted load balancing policy
1 parent ea2e475 commit 3280315

File tree

4 files changed

+373
-0
lines changed

4 files changed

+373
-0
lines changed

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

+35
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,41 @@ public enum DefaultDriverOption implements DriverOption {
853853
LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS(
854854
"advanced.load-balancing-policy.dc-failover.allow-for-local-consistency-levels"),
855855

856+
/**
857+
* How many items in the plan to score.
858+
*
859+
* <p>Value-Type: int
860+
*/
861+
LOAD_BALANCING_SCORED_PLAN_SIZE("advanced.load-balancing-policy.weighted.scored-plan-size"),
862+
863+
/**
864+
* Weight to apply when load balancing for a non-rack node.
865+
*
866+
* <p>Value-Type: double
867+
*/
868+
LOAD_BALANCING_WEIGHT_NON_RACK("advanced.load-balancing-policy.weighted.non-rack"),
869+
870+
/**
871+
* Weight to apply when load balancing for a non-replica node.
872+
*
873+
* <p>Value-Type: double
874+
*/
875+
LOAD_BALANCING_WEIGHT_NON_REPLICA("advanced.load-balancing-policy.weighted.non-replica"),
876+
877+
/**
878+
* Weight to apply when load balancing for a node that is still starting up.
879+
*
880+
* <p>Value-Type: double
881+
*/
882+
LOAD_BALANCING_WEIGHT_STARTING("advanced.load-balancing-policy.weighted.starting"),
883+
884+
/**
885+
* Weight to apply when load balancing for an unhealthy node.
886+
*
887+
* <p>Value-Type: double
888+
*/
889+
LOAD_BALANCING_WEIGHT_UNHEALTHY("advanced.load-balancing-policy.weighted.unhealthy"),
890+
856891
/**
857892
* The classname of the desired {@code MetricIdGenerator} implementation.
858893
*

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

+30
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,36 @@ public String toString() {
891891
new TypedDriverOption<>(
892892
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
893893
GenericType.BOOLEAN);
894+
/** How many items in the plan to score. */
895+
public static final TypedDriverOption<Integer>
896+
LOAD_BALANCING_SCORED_PLAN_SIZE =
897+
new TypedDriverOption<>(
898+
DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE,
899+
GenericType.INTEGER);
900+
/** Weight to apply when load balancing for a non-rack node. */
901+
public static final TypedDriverOption<Double>
902+
LOAD_BALANCING_WEIGHT_NON_RACK =
903+
new TypedDriverOption<>(
904+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK,
905+
GenericType.DOUBLE);
906+
/** Weight to apply when load balancing for a non-replica node. */
907+
public static final TypedDriverOption<Double>
908+
LOAD_BALANCING_WEIGHT_NON_REPLICA =
909+
new TypedDriverOption<>(
910+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA,
911+
GenericType.DOUBLE);
912+
/** Weight to apply when load balancing for a node that is still starting up. */
913+
public static final TypedDriverOption<Double>
914+
LOAD_BALANCING_WEIGHT_STARTING =
915+
new TypedDriverOption<>(
916+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING,
917+
GenericType.DOUBLE);
918+
/** Weight to apply when load balancing for an unhealthy node. */
919+
public static final TypedDriverOption<Double>
920+
LOAD_BALANCING_WEIGHT_UNHEALTHY =
921+
new TypedDriverOption<>(
922+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY,
923+
GenericType.DOUBLE);
894924

895925
private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
896926
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.loadbalancing;
19+
20+
import static java.util.concurrent.TimeUnit.SECONDS;
21+
22+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
23+
import com.datastax.oss.driver.api.core.context.DriverContext;
24+
import com.datastax.oss.driver.api.core.metadata.Node;
25+
import com.datastax.oss.driver.api.core.session.Request;
26+
import com.datastax.oss.driver.api.core.session.Session;
27+
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
28+
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
29+
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
30+
import edu.umd.cs.findbugs.annotations.NonNull;
31+
import java.util.ArrayList;
32+
import java.util.Comparator;
33+
import java.util.Objects;
34+
import java.util.Queue;
35+
import java.util.Random;
36+
import java.util.Set;
37+
import java.util.concurrent.ThreadLocalRandom;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
/**
42+
* A load balancing policy that optimally balances between sending load to local token holder,
43+
* rack replicas, and local datacenter replicas (in that order).
44+
*
45+
* The default weights are good for the vast majority of use cases, but you can tweak them to get different behavior.
46+
*/
47+
public class RackAwareWeightedLoadBalancingPolicy extends DefaultLoadBalancingPolicy {
48+
private static final Logger LOG =
49+
LoggerFactory.getLogger(RackAwareWeightedLoadBalancingPolicy.class);
50+
// Each client will randomly skew so traffic is introduced gradually to a newly up replica
51+
// Each client will start sending at a period between 15s and 30, and they will gradually
52+
// increase load over the next 15 seconds.
53+
private static final long DELAY_TRAFFIC_SKEW_MILLIS = SECONDS.toMillis(15);
54+
private static final long DELAY_TRAFFIC_MILLIS =
55+
DELAY_TRAFFIC_SKEW_MILLIS + ThreadLocalRandom.current().nextLong(DELAY_TRAFFIC_SKEW_MILLIS);
56+
57+
// By default we will only score this many nodes, the rest will get added on without scoring.
58+
// We don't usually need to score every single node if there are more than a few.
59+
static final int DEFAULT_SCORED_PLAN_SIZE = 8;
60+
// Default multiplicative weights. Think of this like "How much concurrency must occur
61+
// before I fail off this node to the next". Note that these defaults are intentionally
62+
// meant to shed load to unloaded rack coordinators when a replica set is all
63+
// relatively heavily loaded (specifically 3x as loaded).
64+
static final double DEFAULT_WEIGHT_NON_RACK = 4.0;
65+
static final double DEFAULT_WEIGHT_NON_REPLICA = 12.0;
66+
static final double DEFAULT_WEIGHT_STARTING = 16.0;
67+
static final double DEFAULT_WEIGHT_UNHEALTHY = 64.0;
68+
69+
private final int planSize;
70+
private final double weightNonRack;
71+
private final double weightNonReplica;
72+
private final double weightStarting;
73+
private final double weightUnhealthy;
74+
75+
public RackAwareWeightedLoadBalancingPolicy(
76+
@NonNull DriverContext context,
77+
@NonNull String profileName) {
78+
super(context, profileName);
79+
this.planSize = profile.getInt(DefaultDriverOption.LOAD_BALANCING_SCORED_PLAN_SIZE, DEFAULT_SCORED_PLAN_SIZE);
80+
// Choices of weights will change how this load balancer prefers endpoints.
81+
// The weight is relative to the outstanding concurrency.
82+
this.weightNonRack = profile.getDouble(
83+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_RACK, DEFAULT_WEIGHT_NON_RACK);
84+
this.weightNonReplica = profile.getDouble(
85+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_NON_REPLICA, DEFAULT_WEIGHT_NON_REPLICA);
86+
this.weightStarting = profile.getDouble(
87+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_STARTING, DEFAULT_WEIGHT_STARTING);
88+
this.weightUnhealthy = profile.getDouble(
89+
DefaultDriverOption.LOAD_BALANCING_WEIGHT_UNHEALTHY, DEFAULT_WEIGHT_UNHEALTHY);
90+
}
91+
92+
@NonNull
93+
@Override
94+
public Queue<Node> newQueryPlan(Request request, Session session) {
95+
if (session == null) {
96+
return super.newQueryPlan(request, null);
97+
}
98+
99+
// Take a copy of nodes and reference to replicas since the node map is concurrent
100+
Set<Node> dcNodeSet = getLiveNodes().dc(getLocalDatacenter());
101+
Set<Node> replicaSet = getReplicas(request, session);
102+
103+
long nowNanos = nanoTime();
104+
long nowMillis = milliTime();
105+
106+
// collect known replica nodes
107+
ArrayList<NodeScore> nodeScores = new ArrayList<>(this.planSize);
108+
for (Node replicaNode : replicaSet) {
109+
if (dcNodeSet.contains(replicaNode)) {
110+
nodeScores.add(new NodeScore(replicaNode,
111+
getWeightedScore(replicaNode, session, nowMillis, nowNanos, true)));
112+
113+
if (nodeScores.size() == this.planSize) {
114+
break; // TODO (akhaku) add the rest of the nodes once we port the tests to OSS
115+
}
116+
}
117+
}
118+
119+
// collect any non-replicas, if we need to and there are some available
120+
if (nodeScores.size() < this.planSize && nodeScores.size() < dcNodeSet.size()) {
121+
Random rand = getRandom();
122+
final Node[] dcNodes = dcNodeSet.toArray(new Node[0]);
123+
124+
for (int i = 0; i < dcNodes.length; i++) {
125+
// pick a random target; shuffle it up front, so we don't revisit
126+
int nextIndex = i + rand.nextInt(dcNodes.length - i);
127+
ArrayUtils.swap(dcNodes, i, nextIndex);
128+
Node dcNode = dcNodes[i];
129+
130+
// skip replicas; they were already inserted
131+
// otherwise, found a valid node: score it
132+
if (!replicaSet.contains(dcNode)) {
133+
nodeScores.add(new NodeScore(dcNode,
134+
getWeightedScore(dcNode, session, nowMillis, nowNanos, false)));
135+
136+
// if we scored, we might by now have already scored enough of what we need
137+
if (nodeScores.size() == this.planSize || nodeScores.size() == dcNodes.length) {
138+
break;
139+
}
140+
}
141+
}
142+
143+
// by now we've scored everything we need to meet planSize, or if not, at least everything available
144+
}
145+
146+
// At this point we have a small, typically 8 element array containing all local
147+
// datacenter replicas and potentially some random choices from the rest of the datacenter.
148+
//
149+
// We now rank nodes by a score function that takes into account outstanding requests weighted
150+
// by replica status, rack placement, uptime, and health status. In general, we expect to
151+
// see the following order
152+
// 1. Rack replicas
153+
// 2. Datacenter replicas
154+
// 3. Rack nodes
155+
// 4. Datacenter nodes
156+
// We expect these orderings to move around when nodes are overloaded. For example if the
157+
// local zone replica has too much load we will failover to other replicas. If those
158+
// replicas are too slow we will failover to other rack nodes.
159+
160+
// sort, extract, convert to query plan
161+
nodeScores.sort(Comparator.comparingDouble(NodeScore::getScore));
162+
Node[] candidate = new Node[nodeScores.size()];
163+
for (int i = 0; i < candidate.length; i++) {
164+
candidate[i] = nodeScores.get(i).getNode();
165+
}
166+
167+
QueryPlan queryPlan = candidate.length == 0 ? QueryPlan.EMPTY : new SimpleQueryPlan((Object[]) candidate);
168+
return maybeAddDcFailover(request, queryPlan);
169+
}
170+
171+
protected String getLocalRack() {
172+
return ""; // TODO (akhaku) internally we passed it through the context, for OSS perhaps something like the local DC helper?
173+
}
174+
175+
protected boolean inRack(Node node) {
176+
if (node == null || node.getRack() == null) return false;
177+
return node.getRack().equals(this.getLocalRack());
178+
}
179+
180+
protected double getWeightedScore(Node node, Session session, long nowMillis, long nowNanos, boolean isReplica) {
181+
int base = Math.min(32768, 1 + getInFlight(node, session));
182+
double weight = 1.0;
183+
184+
if (!inRack(node)) weight *= this.weightNonRack; // 4.0
185+
if (!isReplica) weight *= this.weightNonReplica; // 12.0
186+
if (isUnhealthy(node, session, nowNanos)) weight *= this.weightUnhealthy; // 64.0
187+
188+
// We want to gradually ramp up traffic, shedding heavily at first and then allowing it back
189+
// in gradually. Note:
190+
//
191+
// 1. We cannot use nowNanos for this since node.getUpSinceMillis uses
192+
// System.currentTimeMillis (which may have no relation to nano time).
193+
// 2. getUpSinceMillis might return 0 or -1, in either case don't consider it freshly up.
194+
// 3. When a client starts up everything will appear to be freshly up, which is fine since
195+
// all nodes will randomly be shuffled to the front and back.
196+
long millisSinceUp = nowMillis - node.getUpSinceMillis();
197+
if (millisSinceUp < (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS)) {
198+
double pShed = 1.0 - ((double) millisSinceUp / (DELAY_TRAFFIC_MILLIS + DELAY_TRAFFIC_SKEW_MILLIS));
199+
if (pShed > getRandom().nextDouble()) {
200+
if (LOG.isTraceEnabled()) {
201+
String shed = String.format("%.2f", pShed);
202+
LOG.trace("[{}] shedding at startup [pShed={}, millisSinceUp={}]", node, shed, millisSinceUp);
203+
}
204+
weight *= this.weightStarting; // 16.0
205+
}
206+
}
207+
208+
double score = base * weight;
209+
if (LOG.isDebugEnabled()) {
210+
String msg = String.format("score=%.2f [base=%d, weight=%.2f]", score, base, weight);
211+
LOG.debug("[{}] {}", node, msg);
212+
}
213+
return score;
214+
}
215+
216+
protected long milliTime() {
217+
return System.currentTimeMillis();
218+
}
219+
220+
protected Random getRandom() {
221+
return ThreadLocalRandom.current();
222+
}
223+
224+
/**
225+
* Wraps a Node alongside its score.
226+
*
227+
* Calculating scores is expensive, and not stable (could vary). By wrapping them we can be sure the score
228+
* is calculated only once and does not change during processing.
229+
*/
230+
static class NodeScore {
231+
final double score;
232+
final Node node;
233+
234+
public NodeScore(Node node, double score) {
235+
this.node = node;
236+
this.score = score;
237+
}
238+
239+
public Node getNode() {
240+
return node;
241+
}
242+
243+
public double getScore() {
244+
return score;
245+
}
246+
247+
@Override
248+
public boolean equals(Object o) {
249+
if (this == o) {
250+
return true;
251+
}
252+
if (o == null || getClass() != o.getClass()) {
253+
return false;
254+
}
255+
NodeScore nodeScore = (NodeScore) o;
256+
return Double.compare(score, nodeScore.score) == 0 && Objects.equals(node, nodeScore.node);
257+
}
258+
259+
@Override
260+
public int hashCode() {
261+
return Objects.hash(score, node);
262+
}
263+
}
264+
}

Diff for: core/src/main/resources/reference.conf

+44
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,50 @@ datastax-java-driver {
575575
# Overridable in a profile: yes
576576
allow-for-local-consistency-levels = false
577577
}
578+
579+
# These configuration options apply when using the RackAwareWeightedLoadBalancingPolicy.
580+
# That policy calculates scores for 8 nodes (unless you modify scored-plan-size), multiplies
581+
# them by the number of in-flight requests, then sorts the nodes by weight. The default weights
582+
# will prefer in-rack replicas, followed by non-rack replicas, then rack non-replicas, followed
583+
# by nodes that are not "unhealthy", followed by "unhealthy", and then the rest of the nodes.
584+
# There is also an aversion to nodes that have recently started up, based on the node's
585+
# advertised "millis since up". Changing the weights can change the order of preference.
586+
weighted {
587+
# How many items in the plan to score.
588+
#
589+
# Required: no
590+
# Modifiable at runtime: no
591+
# Overridable in a profile: yes
592+
// scored-plan-size: 8
593+
594+
# Weight to apply when load balancing for a non-rack node.
595+
#
596+
# Required: no
597+
# Modifiable at runtime: no
598+
# Overridable in a profile: yes
599+
// non-rack: 4.0
600+
601+
# Weight to apply when load balancing for a non-replica node.
602+
#
603+
# Required: no
604+
# Modifiable at runtime: no
605+
# Overridable in a profile: yes
606+
// non-replica: 8.0
607+
608+
# Weight to apply when load balancing for a node that is still starting up.
609+
#
610+
# Required: no
611+
# Modifiable at runtime: no
612+
# Overridable in a profile: yes
613+
// starting: 16.0
614+
615+
# Weight to apply when load balancing for an unhealthy node.
616+
#
617+
# Required: no
618+
# Modifiable at runtime: no
619+
# Overridable in a profile: yes
620+
// unhealthy: 64.0
621+
}
578622
}
579623

580624
# Whether to schedule reconnection attempts if all contact points are unreachable on the first

0 commit comments

Comments
 (0)