@@ -20,12 +20,15 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"net/http"
23
+ "strings"
23
24
"sync"
24
25
"time"
25
26
26
27
apierrors "k8s.io/apimachinery/pkg/api/errors"
27
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
+ "k8s.io/apimachinery/pkg/labels"
28
30
"k8s.io/apimachinery/pkg/runtime/schema"
31
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29
32
"k8s.io/apimachinery/pkg/util/sets"
30
33
"k8s.io/apimachinery/pkg/util/wait"
31
34
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
@@ -40,6 +43,7 @@ import (
40
43
"k8s.io/client-go/transport"
41
44
"k8s.io/component-base/metrics/legacyregistry"
42
45
"k8s.io/component-base/tracing"
46
+ "k8s.io/klog/v2"
43
47
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
44
48
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
45
49
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
@@ -370,6 +374,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
370
374
371
375
return nil
372
376
})
377
+ s .GenericAPIServer .AddPostStartHook ("apiservice-wait-for-first-sync" , func (context genericapiserver.PostStartHookContext ) error {
378
+ // when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
379
+ // we only need to do this once.
380
+ err := wait .PollImmediateUntil (100 * time .Millisecond , func () (bool , error ) {
381
+ // fix race
382
+ handledAPIServices := sets .StringKeySet (s .proxyHandlers )
383
+ apiservices , err := s .lister .List (labels .Everything ())
384
+ if err != nil {
385
+ return false , err
386
+ }
387
+ expectedAPIServices := sets .NewString ()
388
+ for _ , apiservice := range apiservices {
389
+ if v1helper .IsAPIServiceConditionTrue (apiservice , v1 .Available ) {
390
+ expectedAPIServices .Insert (apiservice .Name )
391
+ }
392
+ }
393
+
394
+ notYetHandledAPIServices := expectedAPIServices .Difference (handledAPIServices )
395
+ if len (notYetHandledAPIServices ) == 0 {
396
+ return true , nil
397
+ }
398
+ klog .Infof ("still waiting on handling APIServices: %v" , strings .Join (notYetHandledAPIServices .List (), "," ))
399
+
400
+ return false , nil
401
+ }, context .Done ())
402
+ return err
403
+ })
373
404
374
405
if utilfeature .DefaultFeatureGate .Enabled (genericfeatures .AggregatedDiscoveryEndpoint ) {
375
406
s .discoveryAggregationController = NewDiscoveryManager (
@@ -555,7 +586,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
555
586
}
556
587
proxyHandler .updateAPIService (apiService )
557
588
if s .openAPIAggregationController != nil {
558
- s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
589
+ // this is calling a controller. It should already handle being async.
590
+ go func () {
591
+ defer utilruntime .HandleCrash ()
592
+ s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
593
+ }()
559
594
}
560
595
if s .openAPIV3AggregationController != nil {
561
596
s .openAPIV3AggregationController .AddAPIService (proxyHandler , apiService )
@@ -564,7 +599,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
564
599
s .discoveryAggregationController .AddAPIService (apiService , proxyHandler )
565
600
}
566
601
567
- s .proxyHandlers [apiService .Name ] = proxyHandler
602
+ // we want to update the registration bit last after all the pieces are wired together
603
+ defer func () {
604
+ s .proxyHandlers [apiService .Name ] = proxyHandler
605
+ }()
568
606
s .GenericAPIServer .Handler .NonGoRestfulMux .Handle (proxyPath , proxyHandler )
569
607
s .GenericAPIServer .Handler .NonGoRestfulMux .UnlistedHandlePrefix (proxyPath + "/" , proxyHandler )
570
608
0 commit comments