Skip to content

Commit 7345d39

Browse files
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent 699acae commit 7345d39

File tree

10 files changed

+870
-4
lines changed

10 files changed

+870
-4
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+13-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@
116116
<td>Double</td>
117117
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
118118
</tr>
119+
<tr>
120+
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
121+
<td style="word-wrap: break-word;">false</td>
122+
<td>Boolean</td>
123+
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
124+
</tr>
125+
<tr>
126+
<td><h5>job.autoscaler.processing.rate.backpropagation.impact</h5></td>
127+
<td style="word-wrap: break-word;">0.0</td>
128+
<td>Double</td>
129+
<td>How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8</td>
130+
</tr>
119131
<tr>
120132
<td><h5>job.autoscaler.quota.cpu</h5></td>
121133
<td style="word-wrap: break-word;">(none)</td>
@@ -210,7 +222,7 @@
210222
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
211223
<td style="word-wrap: break-word;"></td>
212224
<td>List&lt;String&gt;</td>
213-
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
225+
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
214226
</tr>
215227
<tr>
216228
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+66
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2222
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
23+
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
2324
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
2425
import org.apache.flink.autoscaler.metrics.ScalingMetric;
26+
import org.apache.flink.autoscaler.topology.JobTopology;
2527
import org.apache.flink.autoscaler.topology.ShipStrategy;
2628
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
2729
import org.apache.flink.configuration.Configuration;
@@ -37,6 +39,7 @@
3739
import java.time.Instant;
3840
import java.time.ZoneId;
3941
import java.util.Collection;
42+
import java.util.List;
4043
import java.util.Map;
4144
import java.util.Objects;
4245
import java.util.SortedMap;
@@ -51,8 +54,10 @@
5154
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5255
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5356
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
57+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
5458
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5559
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
60+
import static org.apache.flink.autoscaler.utils.AutoScalerUtils.getTargetDataRateFromUpstream;
5661
import static org.apache.flink.util.Preconditions.checkArgument;
5762

5863
/** Component responsible for computing vertex parallelism based on the scaling metrics. */
@@ -139,6 +144,67 @@ public static ParallelismChange noChange() {
139144
}
140145
}
141146

147+
public void backpropagateRate(
148+
Configuration conf,
149+
JobVertexID vertex,
150+
JobTopology topology,
151+
EvaluatedMetrics evaluatedMetrics,
152+
Map<JobVertexID, Double> backpropagationRate,
153+
List<String> excludedVertices) {
154+
155+
if (excludedVertices.contains(vertex.toHexString())) {
156+
return;
157+
}
158+
159+
var vertexMetrics = evaluatedMetrics.getVertexMetrics().get(vertex);
160+
161+
// vertex scale factor is limited by max scale factor
162+
double scaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
163+
164+
// vertex scale factor is limited by max parallelism scale factor
165+
scaleFactor =
166+
Math.min(
167+
scaleFactor,
168+
vertexMetrics.get(MAX_PARALLELISM).getCurrent()
169+
/ vertexMetrics.get(PARALLELISM).getCurrent());
170+
171+
double maxProcessingRateAfterScale =
172+
Math.min(
173+
vertexMetrics.get(TARGET_DATA_RATE).getAverage()
174+
* backpropagationRate.getOrDefault(vertex, 1.0),
175+
vertexMetrics.get(TRUE_PROCESSING_RATE).getAverage() * scaleFactor);
176+
177+
// evaluating partially updated target data rate from upstream
178+
double targetDataRate =
179+
getTargetDataRateFromUpstream(
180+
evaluatedMetrics, topology, vertex, backpropagationRate);
181+
182+
// if cannot derive finite value, then assume full processing
183+
if (Double.isNaN(targetDataRate) || Double.isInfinite(targetDataRate)) {
184+
return;
185+
}
186+
187+
// if cannot derive finite value, then assume full processing
188+
if (Double.isNaN(maxProcessingRateAfterScale)
189+
|| Double.isInfinite(maxProcessingRateAfterScale)) {
190+
return;
191+
}
192+
193+
// if all input stream can be processed, skip propagation
194+
if (targetDataRate < maxProcessingRateAfterScale) {
195+
return;
196+
}
197+
198+
// propagation coefficient
199+
double adjustmentRate = maxProcessingRateAfterScale / targetDataRate;
200+
201+
// update rate of direct upstream vertices
202+
for (var v : topology.getVertexInfos().get(vertex).getInputs().keySet()) {
203+
double vertexRate = backpropagationRate.getOrDefault(v, 1.0);
204+
backpropagationRate.put(v, vertexRate * adjustmentRate);
205+
}
206+
}
207+
142208
public ParallelismChange computeScaleTargetParallelism(
143209
Context context,
144210
JobVertexID vertex,

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+76
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.flink.runtime.instance.SlotSharingGroupId;
3737
import org.apache.flink.runtime.jobgraph.JobVertexID;
3838

39+
import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;
40+
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
4143

@@ -53,6 +55,7 @@
5355
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
5456
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
5557
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
58+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5659
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5760
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5861
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -61,6 +64,7 @@
6164
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
6265
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
6366
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
67+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
6468
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
6569

6670
/** Class responsible for executing scaling decisions. */
@@ -238,6 +242,11 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
238242

239243
var excludeVertexIdList =
240244
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
245+
246+
if (context.getConfiguration().get(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
247+
backpropagateProcessingRate(context, evaluatedMetrics, jobTopology);
248+
}
249+
241250
evaluatedMetrics
242251
.getVertexMetrics()
243252
.forEach(
@@ -284,6 +293,73 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
284293
return out;
285294
}
286295

296+
private void backpropagateProcessingRate(
297+
Context context, EvaluatedMetrics evaluatedMetrics, JobTopology jobTopology) {
298+
var conf = context.getConfiguration();
299+
double impact = conf.get(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT);
300+
Preconditions.checkState(
301+
0 <= impact && impact <= 1.0, "Backpropagation impact should be in range [0, 1]");
302+
var propagationRate = new HashMap<JobVertexID, Double>();
303+
var excludeVertexIdList =
304+
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
305+
var vertexIterator =
306+
jobTopology
307+
.getStrongTopologicalOrder()
308+
.listIterator(jobTopology.getStrongTopologicalOrder().size());
309+
310+
// backpropagate scale factors
311+
while (vertexIterator.hasPrevious()) {
312+
var vertex = vertexIterator.previous();
313+
jobVertexScaler.backpropagateRate(
314+
conf,
315+
vertex,
316+
jobTopology,
317+
evaluatedMetrics,
318+
propagationRate,
319+
excludeVertexIdList);
320+
}
321+
322+
// use an extra map to not lose precision
323+
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();
324+
325+
// re-evaluating vertices capacity
326+
// Target data rate metric is rewritten for parallelism evaluation
327+
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
328+
double newTargetDataRate = 0.0;
329+
330+
if (jobTopology.isSource(vertex)) {
331+
double targetDateRate =
332+
evaluatedMetrics
333+
.getVertexMetrics()
334+
.get(vertex)
335+
.get(TARGET_DATA_RATE)
336+
.getAverage();
337+
338+
// linear interpolation between adjusted value and initial
339+
newTargetDataRate =
340+
targetDateRate
341+
* (impact * propagationRate.getOrDefault(vertex, 1.0)
342+
+ 1.0
343+
- impact);
344+
} else {
345+
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
346+
newTargetDataRate +=
347+
adjustedDataRate.get(input)
348+
* jobTopology
349+
.getVertexInfos()
350+
.get(vertex)
351+
.getInputRatios()
352+
.get(input);
353+
}
354+
}
355+
adjustedDataRate.put(vertex, newTargetDataRate);
356+
evaluatedMetrics
357+
.getVertexMetrics()
358+
.get(vertex)
359+
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(newTargetDataRate));
360+
}
361+
}
362+
287363
private boolean isJobUnderMemoryPressure(
288364
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {
289365

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

+1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ private void computeTargetDataRate(
346346
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
347347
var outputRatio =
348348
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
349+
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
349350
LOG.debug(
350351
"Computed output ratio for edge ({} -> {}) : {}",
351352
inputVertex,

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61+
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62+
autoScalerConfig("processing.rate.backpropagation.enabled")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withFallbackKeys(
66+
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
67+
.withDescription(
68+
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
69+
70+
public static final ConfigOption<Double> PROCESSING_RATE_BACKPROPAGATION_IMPACT =
71+
autoScalerConfig("processing.rate.backpropagation.impact")
72+
.doubleType()
73+
.defaultValue(0.0)
74+
.withFallbackKeys(
75+
oldOperatorConfigKey("processing.rate.backpropagation.impact"))
76+
.withDescription(
77+
"How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8");
78+
6179
public static final ConfigOption<Duration> METRICS_WINDOW =
6280
autoScalerConfig("metrics.window")
6381
.durationType()
@@ -320,7 +338,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
320338
.defaultValues()
321339
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
322340
.withDescription(
323-
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
341+
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");
324342

325343
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
326344
autoScalerConfig("scaling.event.interval")

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java

+70-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.List;
4040
import java.util.Map;
4141
import java.util.Set;
42+
import java.util.TreeMap;
4243
import java.util.stream.Collectors;
4344

4445
/** Structure representing information about the jobgraph that is relevant for scaling. */
@@ -52,6 +53,7 @@ public class JobTopology {
5253
@Getter private final Map<SlotSharingGroupId, Set<JobVertexID>> slotSharingGroupMapping;
5354
@Getter private final Set<JobVertexID> finishedVertices;
5455
@Getter private final List<JobVertexID> verticesInTopologicalOrder;
56+
@Getter private final List<JobVertexID> strongTopologicalOrder;
5557

5658
public JobTopology(Collection<VertexInfo> vertexInfo) {
5759
this(new HashSet<>(vertexInfo));
@@ -99,6 +101,7 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
99101
this.slotSharingGroupMapping = ImmutableMap.copyOf(vertexSlotSharingGroupMapping);
100102
this.finishedVertices = finishedVertices.build();
101103
this.verticesInTopologicalOrder = returnVerticesInTopologicalOrder();
104+
this.strongTopologicalOrder = returnStrongTopologicalOrder();
102105
}
103106

104107
public VertexInfo get(JobVertexID jvi) {
@@ -112,9 +115,9 @@ public boolean isSource(JobVertexID jobVertexID) {
112115
private List<JobVertexID> returnVerticesInTopologicalOrder() {
113116
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());
114117

115-
Map<JobVertexID, List<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
118+
Map<JobVertexID, Set<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
116119
vertexInfos.forEach(
117-
(id, v) -> remainingInputs.put(id, new ArrayList<>(v.getInputs().keySet())));
120+
(id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet())));
118121

119122
while (!remainingInputs.isEmpty()) {
120123
List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();
@@ -140,6 +143,71 @@ private List<JobVertexID> returnVerticesInTopologicalOrder() {
140143
return sorted;
141144
}
142145

146+
/**
147+
* Strong topological order is a topological order, where vertices are also sorted by their
148+
* distance to the most distant sources.
149+
*
150+
* @return vertices in the enhanced topological order
151+
*/
152+
public List<JobVertexID> returnStrongTopologicalOrder() {
153+
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());
154+
155+
Map<JobVertexID, Set<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
156+
vertexInfos.forEach(
157+
(id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet())));
158+
159+
Map<JobVertexID, Integer> distances = new HashMap<>(vertexInfos.size());
160+
TreeMap<Integer, List<JobVertexID>> order = new TreeMap<>();
161+
162+
while (!remainingInputs.isEmpty()) {
163+
List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();
164+
165+
// storing
166+
remainingInputs.forEach(
167+
(v, inputs) -> {
168+
if (inputs.isEmpty()) {
169+
int dist = distances.getOrDefault(v, 0);
170+
if (!order.containsKey(dist)) {
171+
order.put(dist, new ArrayList<>());
172+
}
173+
order.get(dist).add(v);
174+
verticesWithZeroIndegree.add(v);
175+
}
176+
});
177+
178+
verticesWithZeroIndegree.forEach(
179+
v -> {
180+
remainingInputs.remove(v);
181+
vertexInfos
182+
.get(v)
183+
.getOutputs()
184+
.keySet()
185+
.forEach(o -> remainingInputs.get(o).remove(v));
186+
});
187+
188+
List<JobVertexID> layer = order.firstEntry().getValue();
189+
order.remove(order.firstKey());
190+
191+
layer.forEach(
192+
v -> {
193+
final int dist = distances.getOrDefault(v, 0);
194+
vertexInfos
195+
.get(v)
196+
.getOutputs()
197+
.keySet()
198+
.forEach(
199+
o -> {
200+
remainingInputs.get(o).remove(v);
201+
int dist1 = distances.getOrDefault(o, 0);
202+
distances.put(o, Math.max(dist1, dist + 1));
203+
});
204+
});
205+
206+
sorted.addAll(layer);
207+
}
208+
return sorted;
209+
}
210+
143211
public static JobTopology fromJsonPlan(
144212
String jsonPlan,
145213
Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,

0 commit comments

Comments
 (0)