Skip to content

Commit e80fd7c

Browse files
APlyusninAPlyusnin
APlyusnin
authored and
APlyusnin
committed
[FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
1 parent f2adb15 commit e80fd7c

File tree

7 files changed

+1045
-264
lines changed

7 files changed

+1045
-264
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler;
19+
20+
import org.apache.flink.runtime.jobgraph.JobVertexID;
21+
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
/** Class for storing intermediate scaling results. */
28+
public class IntermediateScalingResult {
29+
30+
private final Map<JobVertexID, ScalingSummary> scalingSummaries;
31+
private final List<JobVertexID> bottlenecks;
32+
33+
private double backpropagationScaleFactor = 1.0;
34+
35+
public IntermediateScalingResult() {
36+
scalingSummaries = new HashMap<>();
37+
bottlenecks = new ArrayList<>();
38+
}
39+
40+
void addScalingSummary(JobVertexID vertex, ScalingSummary scalingSummary) {
41+
scalingSummaries.put(vertex, scalingSummary);
42+
}
43+
44+
void addBottleneckVertex(JobVertexID bottleneck, double factor) {
45+
bottlenecks.add(bottleneck);
46+
backpropagationScaleFactor = Math.min(backpropagationScaleFactor, factor);
47+
}
48+
49+
public List<JobVertexID> getBottlenecks() {
50+
return bottlenecks;
51+
}
52+
53+
public double getBackpropagationScaleFactor() {
54+
return backpropagationScaleFactor;
55+
}
56+
57+
public Map<JobVertexID, ScalingSummary> getScalingSummaries() {
58+
return scalingSummaries;
59+
}
60+
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+53-16
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,31 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
7171
this.autoScalerEventHandler = autoScalerEventHandler;
7272
}
7373

74-
public int computeScaleTargetParallelism(
74+
public VertexScalingResult computeScaleTargetParallelism(
7575
Context context,
7676
JobVertexID vertex,
7777
Collection<ShipStrategy> inputShipStrategies,
7878
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
7979
SortedMap<Instant, ScalingSummary> history,
80-
Duration restartTime) {
80+
Duration restartTime,
81+
double backpropagationScaleFactor) {
8182
var conf = context.getConfiguration();
83+
84+
boolean excluded =
85+
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(vertex.toHexString());
86+
if (excluded) {
87+
LOG.debug(
88+
"Vertex {} is part of `vertex.exclude.ids` config, Check for bottleneck but not scale",
89+
vertex);
90+
}
91+
8292
var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
8393
double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
8494
if (Double.isNaN(averageTrueProcessingRate)) {
8595
LOG.warn(
8696
"True processing rate is not available for {}, cannot compute new parallelism",
8797
vertex);
88-
return currentParallelism;
98+
return VertexScalingResult.normalScaling(currentParallelism);
8999
}
90100

91101
double targetCapacity =
@@ -95,9 +105,11 @@ public int computeScaleTargetParallelism(
95105
LOG.warn(
96106
"Target data rate is not available for {}, cannot compute new parallelism",
97107
vertex);
98-
return currentParallelism;
108+
return VertexScalingResult.normalScaling(currentParallelism);
99109
}
100110

111+
targetCapacity *= backpropagationScaleFactor;
112+
101113
LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
102114
double scaleFactor = targetCapacity / averageTrueProcessingRate;
103115
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
@@ -122,32 +134,44 @@ public int computeScaleTargetParallelism(
122134
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
123135
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
124136

125-
int newParallelism =
137+
int parallelismLowerLimit =
138+
excluded
139+
? currentParallelism
140+
: Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM));
141+
int parallelismUpperLimit =
142+
excluded
143+
? currentParallelism
144+
: Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM));
145+
146+
var scalingResult =
126147
scale(
127148
currentParallelism,
128149
inputShipStrategies,
129150
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
130151
scaleFactor,
131-
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
132-
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
152+
parallelismLowerLimit,
153+
parallelismUpperLimit);
133154

134-
if (newParallelism == currentParallelism
155+
if (scalingResult.getParallelism() == currentParallelism
135156
|| blockScalingBasedOnPastActions(
136157
context,
137158
vertex,
138159
conf,
139160
evaluatedMetrics,
140161
history,
141162
currentParallelism,
142-
newParallelism)) {
143-
return currentParallelism;
163+
scalingResult.getParallelism())) {
164+
return new VertexScalingResult(
165+
currentParallelism,
166+
scalingResult.getBottleneckScaleFactor(),
167+
scalingResult.isBottleneck());
144168
}
145169

146170
// We record our expectations for this scaling operation
147171
evaluatedMetrics.put(
148172
ScalingMetric.EXPECTED_PROCESSING_RATE,
149173
EvaluatedScalingMetric.of(cappedTargetCapacity));
150-
return newParallelism;
174+
return scalingResult;
151175
}
152176

153177
private boolean blockScalingBasedOnPastActions(
@@ -249,9 +273,12 @@ private boolean detectIneffectiveScaleUp(
249273
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
250274
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
251275
* remainder.
276+
*
277+
* <p>If newParallelism exceeds min(parallelismUpperLimit, maxParallelism) the job vertex
278+
* considered to be a bottleneck.
252279
*/
253280
@VisibleForTesting
254-
protected static int scale(
281+
protected static VertexScalingResult scale(
255282
int currentParallelism,
256283
Collection<ShipStrategy> inputShipStrategies,
257284
int maxParallelism,
@@ -284,26 +311,36 @@ protected static int scale(
284311
// parallelism upper limit
285312
final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
286313

314+
boolean isBottleneck = false;
315+
double bottleneckScaleFactor = 1.0;
316+
317+
// If required parallelism is higher than upper bound ---> the vertex is a bottleneck
318+
if (newParallelism > upperBound) {
319+
isBottleneck = true;
320+
bottleneckScaleFactor = (double) upperBound / newParallelism;
321+
newParallelism = upperBound;
322+
}
323+
287324
// Apply min/max parallelism
288-
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
325+
newParallelism = Math.max(parallelismLowerLimit, newParallelism);
289326

290327
var adjustByMaxParallelism =
291328
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
292329
if (!adjustByMaxParallelism) {
293-
return newParallelism;
330+
return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck);
294331
}
295332

296333
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
297334
// adjust the parallelism such that it divides the maxParallelism without a remainder
298335
// => data is evenly spread across subtasks
299336
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
300337
if (maxParallelism % p == 0) {
301-
return p;
338+
return new VertexScalingResult(p, bottleneckScaleFactor, isBottleneck);
302339
}
303340
}
304341

305342
// If parallelism adjustment fails, use originally computed parallelism
306-
return newParallelism;
343+
return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck);
307344
}
308345

309346
@VisibleForTesting

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+65-26
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.SortedMap;
5050

5151
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
52+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
5253
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
5354
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
5455
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
@@ -220,39 +221,77 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
220221
return Map.of();
221222
}
222223

223-
var out = new HashMap<JobVertexID, ScalingSummary>();
224-
var excludeVertexIdList =
225-
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
224+
var scalingResult =
225+
computeScalingSummaryInternal(
226+
context, evaluatedMetrics, scalingHistory, restartTime, jobTopology, 1.0);
227+
228+
if (scalingResult.getBottlenecks().isEmpty()
229+
|| !context.getConfiguration()
230+
.getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
231+
return scalingResult.getScalingSummaries();
232+
}
233+
234+
LOG.info("Vertices with ids {} are bottlenecks", scalingResult.getBottlenecks());
235+
236+
double backpropagationScaleFactor = scalingResult.getBackpropagationScaleFactor();
237+
238+
LOG.info(
239+
"Processing rate back propagation scaling factor is {}",
240+
backpropagationScaleFactor);
241+
242+
scalingResult =
243+
computeScalingSummaryInternal(
244+
context,
245+
evaluatedMetrics,
246+
scalingHistory,
247+
restartTime,
248+
jobTopology,
249+
backpropagationScaleFactor);
250+
251+
return scalingResult.getScalingSummaries();
252+
}
253+
254+
IntermediateScalingResult computeScalingSummaryInternal(
255+
Context context,
256+
EvaluatedMetrics evaluatedMetrics,
257+
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
258+
Duration restartTime,
259+
JobTopology jobTopology,
260+
double backpropagationScaleFactor) {
261+
262+
var scalingResult = new IntermediateScalingResult();
226263
evaluatedMetrics
227264
.getVertexMetrics()
228265
.forEach(
229266
(v, metrics) -> {
230-
if (excludeVertexIdList.contains(v.toHexString())) {
231-
LOG.debug(
232-
"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling",
233-
v);
234-
} else {
235-
var currentParallelism =
236-
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
237-
238-
var newParallelism =
239-
jobVertexScaler.computeScaleTargetParallelism(
240-
context,
241-
v,
242-
jobTopology.get(v).getInputs().values(),
243-
metrics,
244-
scalingHistory.getOrDefault(
245-
v, Collections.emptySortedMap()),
246-
restartTime);
247-
if (currentParallelism != newParallelism) {
248-
out.put(
267+
var currentParallelism =
268+
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();
269+
270+
var newParallelism =
271+
jobVertexScaler.computeScaleTargetParallelism(
272+
context,
249273
v,
250-
new ScalingSummary(
251-
currentParallelism, newParallelism, metrics));
252-
}
274+
jobTopology.get(v).getInputs().values(),
275+
metrics,
276+
scalingHistory.getOrDefault(
277+
v, Collections.emptySortedMap()),
278+
restartTime,
279+
backpropagationScaleFactor);
280+
if (currentParallelism != newParallelism.getParallelism()) {
281+
scalingResult.addScalingSummary(
282+
v,
283+
new ScalingSummary(
284+
currentParallelism,
285+
newParallelism.getParallelism(),
286+
metrics));
287+
}
288+
// Even if parallelism didn't change, vertex can be a bottleneck
289+
if (newParallelism.isBottleneck()) {
290+
scalingResult.addBottleneckVertex(
291+
v, newParallelism.getBottleneckScaleFactor());
253292
}
254293
});
255-
return out;
294+
return scalingResult;
256295
}
257296

258297
private boolean isJobUnderMemoryPressure(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler;
19+
20+
import lombok.AllArgsConstructor;
21+
import lombok.Getter;
22+
23+
/** Class for storing information on how a single vertex is scaled. */
24+
@AllArgsConstructor
25+
@Getter
26+
public class VertexScalingResult {
27+
private int parallelism;
28+
private double bottleneckScaleFactor;
29+
private boolean isBottleneck;
30+
31+
public static VertexScalingResult normalScaling(int parallelism) {
32+
return new VertexScalingResult(parallelism, 1.0, false);
33+
}
34+
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
5858
.withDescription(
5959
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");
6060

61+
public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
62+
autoScalerConfig("processing.rate.backpropagation.enabled")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withFallbackKeys(
66+
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
67+
.withDescription(
68+
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");
69+
6170
public static final ConfigOption<Duration> METRICS_WINDOW =
6271
autoScalerConfig("metrics.window")
6372
.durationType()
@@ -313,7 +322,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
313322
.defaultValues()
314323
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
315324
.withDescription(
316-
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
325+
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");
317326

318327
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
319328
autoScalerConfig("scaling.event.interval")

0 commit comments

Comments
 (0)