@@ -19,6 +19,7 @@ package apimachinery
19
19
import (
20
20
"bytes"
21
21
"context"
22
+ "errors"
22
23
"fmt"
23
24
"io"
24
25
"net/http"
@@ -32,14 +33,21 @@ import (
32
33
33
34
flowcontrol "k8s.io/api/flowcontrol/v1beta1"
34
35
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
+ "k8s.io/apimachinery/pkg/util/wait"
37
+ "k8s.io/apiserver/pkg/util/apihelpers"
38
+ clientset "k8s.io/client-go/kubernetes"
35
39
"k8s.io/client-go/rest"
36
40
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
37
41
"k8s.io/kubernetes/test/e2e/framework"
38
42
)
39
43
40
44
const (
41
- requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
42
- requestConcurrencyLimitMetricLabelName = "priority_level"
45
+ requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
46
+ priorityLevelLabelName = "priority_level"
47
+ )
48
+
49
+ var (
50
+ errPriorityLevelNotFound = errors .New ("cannot find a metric sample with a matching priority level name label" )
43
51
)
44
52
45
53
var _ = SIGDescribe ("API priority and fairness" , func () {
@@ -59,6 +67,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
59
67
createdFlowSchema , cleanup := createFlowSchema (f , testingFlowSchemaName , 1000 , testingPriorityLevelName , []string {matchingUsername })
60
68
defer cleanup ()
61
69
70
+ ginkgo .By ("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state" )
71
+ waitForSteadyState (f , testingFlowSchemaName , testingPriorityLevelName )
72
+
62
73
var response * http.Response
63
74
ginkgo .By ("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user" )
64
75
response = makeRequest (f , matchingUsername )
@@ -126,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() {
126
137
framework .Logf ("creating FlowSchema %q" , clients [i ].flowSchemaName )
127
138
_ , cleanup = createFlowSchema (f , clients [i ].flowSchemaName , clients [i ].matchingPrecedence , clients [i ].priorityLevelName , []string {clients [i ].username })
128
139
defer cleanup ()
140
+
141
+ ginkgo .By ("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state" )
142
+ waitForSteadyState (f , clients [i ].flowSchemaName , clients [i ].priorityLevelName )
129
143
}
130
144
131
145
ginkgo .By ("getting request concurrency from metrics" )
132
146
for i := range clients {
133
- realConcurrency := getPriorityLevelConcurrency (f , clients [i ].priorityLevelName )
147
+ realConcurrency , err := getPriorityLevelConcurrency (f .ClientSet , clients [i ].priorityLevelName )
148
+ framework .ExpectNoError (err )
134
149
clients [i ].concurrency = int32 (float64 (realConcurrency ) * clients [i ].concurrencyMultiplier )
135
150
if clients [i ].concurrency < 1 {
136
151
clients [i ].concurrency = 1
@@ -185,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
185
200
_ , cleanup = createFlowSchema (f , flowSchemaName , 1000 , priorityLevelName , []string {highQPSClientName , lowQPSClientName })
186
201
defer cleanup ()
187
202
203
+ ginkgo .By ("waiting for testing flow schema and priority level to reach steady state" )
204
+ waitForSteadyState (f , flowSchemaName , priorityLevelName )
205
+
188
206
type client struct {
189
207
username string
190
208
qps float64
@@ -199,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
199
217
}
200
218
201
219
framework .Logf ("getting real concurrency" )
202
- realConcurrency := getPriorityLevelConcurrency (f , priorityLevelName )
220
+ realConcurrency , err := getPriorityLevelConcurrency (f .ClientSet , priorityLevelName )
221
+ framework .ExpectNoError (err )
203
222
for i := range clients {
204
223
clients [i ].concurrency = int32 (float64 (realConcurrency ) * clients [i ].concurrencyMultiplier )
205
224
if clients [i ].concurrency < 1 {
@@ -259,33 +278,35 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur
259
278
}
260
279
}
261
280
262
- //lint:ignore U1000 function is actually referenced
263
- func getPriorityLevelConcurrency (f * framework.Framework , priorityLevelName string ) int32 {
264
- resp , err := f .ClientSet .CoreV1 ().RESTClient ().Get ().RequestURI ("/metrics" ).DoRaw (context .TODO ())
265
- framework .ExpectNoError (err )
281
+ func getPriorityLevelConcurrency (c clientset.Interface , priorityLevelName string ) (int32 , error ) {
282
+ resp , err := c .CoreV1 ().RESTClient ().Get ().RequestURI ("/metrics" ).DoRaw (context .TODO ())
283
+ if err != nil {
284
+ return 0 , err
285
+ }
266
286
sampleDecoder := expfmt.SampleDecoder {
267
287
Dec : expfmt .NewDecoder (bytes .NewBuffer (resp ), expfmt .FmtText ),
268
288
Opts : & expfmt.DecodeOptions {},
269
289
}
270
290
for {
271
291
var v model.Vector
272
292
err := sampleDecoder .Decode (& v )
273
- if err == io .EOF {
274
- break
293
+ if err != nil {
294
+ if err == io .EOF {
295
+ break
296
+ }
297
+ return 0 , err
275
298
}
276
- framework .ExpectNoError (err )
277
299
for _ , metric := range v {
278
300
if string (metric .Metric [model .MetricNameLabel ]) != requestConcurrencyLimitMetricName {
279
301
continue
280
302
}
281
- if string (metric .Metric [requestConcurrencyLimitMetricLabelName ]) != priorityLevelName {
303
+ if string (metric .Metric [priorityLevelLabelName ]) != priorityLevelName {
282
304
continue
283
305
}
284
- return int32 (metric .Value )
306
+ return int32 (metric .Value ), nil
285
307
}
286
308
}
287
- framework .ExpectNoError (fmt .Errorf ("cannot find metric %q with matching priority level name label %q" , requestConcurrencyLimitMetricName , priorityLevelName ))
288
- return 0
309
+ return 0 , errPriorityLevelNotFound
289
310
}
290
311
291
312
// createFlowSchema creates a flow schema referring to a particular priority
@@ -335,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
335
356
}
336
357
}
337
358
359
+ // waitForSteadyState repeatedly polls the API server to check if the newly
360
+ // created flow schema and priority level have been seen by the APF controller
361
+ // by checking: (1) the dangling priority level reference condition in the flow
362
+ // schema status, and (2) metrics. The function times out after 30 seconds.
363
+ func waitForSteadyState (f * framework.Framework , flowSchemaName string , priorityLevelName string ) {
364
+ framework .ExpectNoError (wait .Poll (time .Second , 30 * time .Second , func () (bool , error ) {
365
+ fs , err := f .ClientSet .FlowcontrolV1beta1 ().FlowSchemas ().Get (context .TODO (), flowSchemaName , metav1.GetOptions {})
366
+ if err != nil {
367
+ return false , err
368
+ }
369
+ condition := apihelpers .GetFlowSchemaConditionByType (fs , flowcontrol .FlowSchemaConditionDangling )
370
+ if condition == nil || condition .Status != flowcontrol .ConditionFalse {
371
+ // The absence of the dangling status object implies that the APF
372
+ // controller isn't done with syncing the flow schema object. And, of
373
+ // course, the condition being anything but false means that steady state
374
+ // hasn't been achieved.
375
+ return false , nil
376
+ }
377
+ _ , err = getPriorityLevelConcurrency (f .ClientSet , priorityLevelName )
378
+ if err != nil {
379
+ if err == errPriorityLevelNotFound {
380
+ return false , nil
381
+ }
382
+ return false , err
383
+ }
384
+ return true , nil
385
+ }))
386
+ }
387
+
338
388
// makeRequests creates a request to the API server and returns the response.
339
389
func makeRequest (f * framework.Framework , username string ) * http.Response {
340
390
config := f .ClientConfig ()
0 commit comments