@@ -18,29 +18,34 @@ import (
18
18
"k8s.io/klog/v2"
19
19
)
20
20
21
- func newOpenshiftAPIServiceReachabilityCheck () * aggregatedAPIServiceAvailabilityCheck {
22
- return newAggregatedAPIServiceReachabilityCheck ("openshift-apiserver" , "api" )
21
+ func newOpenshiftAPIServiceReachabilityCheck (ipForKubernetesDefaultService net. IP ) * aggregatedAPIServiceAvailabilityCheck {
22
+ return newAggregatedAPIServiceReachabilityCheck (ipForKubernetesDefaultService , "openshift-apiserver" , "api" )
23
23
}
24
24
25
- func newOAuthPIServiceReachabilityCheck () * aggregatedAPIServiceAvailabilityCheck {
26
- return newAggregatedAPIServiceReachabilityCheck ("openshift-oauth-apiserver" , "api" )
25
+ func newOAuthPIServiceReachabilityCheck (ipForKubernetesDefaultService net. IP ) * aggregatedAPIServiceAvailabilityCheck {
26
+ return newAggregatedAPIServiceReachabilityCheck (ipForKubernetesDefaultService , "openshift-oauth-apiserver" , "api" )
27
27
}
28
28
29
29
// if the API service is not found, then this check returns quickly.
30
30
// if the endpoint is not accessible within 60 seconds, we report ready no matter what
31
31
// otherwise, wait for up to 60 seconds to be able to reach the apiserver
32
- func newAggregatedAPIServiceReachabilityCheck (namespace , service string ) * aggregatedAPIServiceAvailabilityCheck {
32
+ func newAggregatedAPIServiceReachabilityCheck (ipForKubernetesDefaultService net. IP , namespace , service string ) * aggregatedAPIServiceAvailabilityCheck {
33
33
return & aggregatedAPIServiceAvailabilityCheck {
34
- done : make (chan struct {}),
35
- namespace : namespace ,
36
- serviceName : service ,
34
+ done : make (chan struct {}),
35
+ ipForKubernetesDefaultService : ipForKubernetesDefaultService ,
36
+ namespace : namespace ,
37
+ serviceName : service ,
37
38
}
38
39
}
39
40
40
41
type aggregatedAPIServiceAvailabilityCheck struct {
41
42
// done indicates that this check is complete (success or failure) and the check should return true
42
43
done chan struct {}
43
44
45
+ // ipForKubernetesDefaultService is used to determine whether this endpoint is the only one for the kubernetes.default.svc
46
+ // if so, it will report reachable immediately because honoring some requests is better than honoring no requests.
47
+ ipForKubernetesDefaultService net.IP
48
+
44
49
// namespace is the namespace hosting the service for the aggregated api
45
50
namespace string
46
51
// serviceName is used to get a list of endpoints to directly dial
@@ -78,6 +83,32 @@ func (c *aggregatedAPIServiceAvailabilityCheck) checkForConnection(context gener
78
83
panic (err )
79
84
}
80
85
86
+ ctx , cancel := gocontext .WithTimeout (gocontext .TODO (), 30 * time .Second )
87
+ defer cancel ()
88
+
89
+ // if the kubernetes.default.svc needs an endpoint and this is the only apiserver than can fulfill it, then we don't
90
+ // wait for reachability. We wait for other conditions, but unreachable apiservers correctly 503 for clients.
91
+ kubeEndpoints , err := kubeClient .CoreV1 ().Endpoints ("default" ).Get (ctx , "kubernetes" , metav1.GetOptions {})
92
+ switch {
93
+ case apierrors .IsNotFound (err ):
94
+ utilruntime .HandleError (fmt .Errorf ("%s did not find a kubernetes.default.svc endpoint" , c .Name ()))
95
+ return
96
+ case err != nil :
97
+ utilruntime .HandleError (fmt .Errorf ("%s unable to read a kubernetes.default.svc endpoint: %w" , c .Name (), err ))
98
+ return
99
+ case len (kubeEndpoints .Subsets ) == 0 :
100
+ utilruntime .HandleError (fmt .Errorf ("%s did not find any IPs for kubernetes.default.svc endpoint" , c .Name ()))
101
+ return
102
+ case len (kubeEndpoints .Subsets [0 ].Addresses ) == 0 :
103
+ utilruntime .HandleError (fmt .Errorf ("%s did not find any IPs for kubernetes.default.svc endpoint" , c .Name ()))
104
+ return
105
+ case len (kubeEndpoints .Subsets [0 ].Addresses ) == 1 :
106
+ if kubeEndpoints .Subsets [0 ].Addresses [0 ].IP == c .ipForKubernetesDefaultService .String () {
107
+ utilruntime .HandleError (fmt .Errorf ("%s only found this kube-apiserver's IP (%v) in kubernetes.default.svc endpoint" , c .Name (), c .ipForKubernetesDefaultService ))
108
+ return
109
+ }
110
+ }
111
+
81
112
// Start a thread which repeatedly tries to connect to any aggregated apiserver endpoint.
82
113
// 1. if the aggregated apiserver endpoint doesn't exist, logs a warning and reports ready
83
114
// 2. if a connection cannot be made, after 60 seconds logs an error and reports ready -- this avoids a rebootstrapping cycle
0 commit comments