@@ -20,12 +20,15 @@ import (
20
20
"context"
21
21
"fmt"
22
22
"net/http"
23
+ "strings"
23
24
"sync"
24
25
"time"
25
26
26
27
apierrors "k8s.io/apimachinery/pkg/api/errors"
27
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
+ "k8s.io/apimachinery/pkg/labels"
28
30
"k8s.io/apimachinery/pkg/runtime/schema"
31
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29
32
"k8s.io/apimachinery/pkg/util/sets"
30
33
"k8s.io/apimachinery/pkg/util/wait"
31
34
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
@@ -40,6 +43,7 @@ import (
40
43
"k8s.io/client-go/transport"
41
44
"k8s.io/component-base/metrics/legacyregistry"
42
45
"k8s.io/component-base/tracing"
46
+ "k8s.io/klog/v2"
43
47
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
44
48
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
45
49
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
@@ -360,6 +364,33 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
360
364
361
365
return nil
362
366
})
367
+ s .GenericAPIServer .AddPostStartHook ("apiservice-wait-for-first-sync" , func (context genericapiserver.PostStartHookContext ) error {
368
+ // when the aggregator first starts, it should make sure that it has proxy handlers for all the known good API services at this time
369
+ // we only need to do this once.
370
+ err := wait .PollImmediateUntil (100 * time .Millisecond , func () (bool , error ) {
371
+ // fix race
372
+ handledAPIServices := sets .StringKeySet (s .proxyHandlers )
373
+ apiservices , err := s .lister .List (labels .Everything ())
374
+ if err != nil {
375
+ return false , err
376
+ }
377
+ expectedAPIServices := sets .NewString ()
378
+ for _ , apiservice := range apiservices {
379
+ if v1helper .IsAPIServiceConditionTrue (apiservice , v1 .Available ) {
380
+ expectedAPIServices .Insert (apiservice .Name )
381
+ }
382
+ }
383
+
384
+ notYetHandledAPIServices := expectedAPIServices .Difference (handledAPIServices )
385
+ if len (notYetHandledAPIServices ) == 0 {
386
+ return true , nil
387
+ }
388
+ klog .Infof ("still waiting on handling APIServices: %v" , strings .Join (notYetHandledAPIServices .List (), "," ))
389
+
390
+ return false , nil
391
+ }, context .Done ())
392
+ return err
393
+ })
363
394
364
395
s .discoveryAggregationController = NewDiscoveryManager (
365
396
// Use aggregator as the source name to avoid overwriting native/CRD
@@ -543,7 +574,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
543
574
}
544
575
proxyHandler .updateAPIService (apiService )
545
576
if s .openAPIAggregationController != nil {
546
- s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
577
+ // this is calling a controller. It should already handle being async.
578
+ go func () {
579
+ defer utilruntime .HandleCrash ()
580
+ s .openAPIAggregationController .AddAPIService (proxyHandler , apiService )
581
+ }()
547
582
}
548
583
if s .openAPIV3AggregationController != nil {
549
584
s .openAPIV3AggregationController .AddAPIService (proxyHandler , apiService )
@@ -552,7 +587,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
552
587
s .discoveryAggregationController .AddAPIService (apiService , proxyHandler )
553
588
}
554
589
555
- s .proxyHandlers [apiService .Name ] = proxyHandler
590
+ // we want to update the registration bit last after all the pieces are wired together
591
+ defer func () {
592
+ s .proxyHandlers [apiService .Name ] = proxyHandler
593
+ }()
556
594
s .GenericAPIServer .Handler .NonGoRestfulMux .Handle (proxyPath , proxyHandler )
557
595
s .GenericAPIServer .Handler .NonGoRestfulMux .UnlistedHandlePrefix (proxyPath + "/" , proxyHandler )
558
596
0 commit comments