@@ -33,6 +33,7 @@ import (
33
33
"k8s.io/apimachinery/pkg/types"
34
34
"k8s.io/apimachinery/pkg/util/intstr"
35
35
utilnet "k8s.io/apimachinery/pkg/util/net"
36
+ "k8s.io/apimachinery/pkg/util/sets"
36
37
"k8s.io/apimachinery/pkg/util/uuid"
37
38
"k8s.io/apimachinery/pkg/util/wait"
38
39
clientset "k8s.io/client-go/kubernetes"
@@ -328,6 +329,72 @@ func PickNodeIP(c clientset.Interface) string {
328
329
return ip
329
330
}
330
331
332
+ // GetEndpointNodes returns a map of nodenames:external-ip on which the
333
+ // endpoints of the given Service are running.
334
+ func (j * ServiceTestJig ) GetEndpointNodes (svc * v1.Service ) map [string ][]string {
335
+ nodes := j .GetNodes (MaxNodesForEndpointsTests )
336
+ endpoints , err := j .Client .CoreV1 ().Endpoints (svc .Namespace ).Get (svc .Name , metav1.GetOptions {})
337
+ if err != nil {
338
+ Failf ("Get endpoints for service %s/%s failed (%s)" , svc .Namespace , svc .Name , err )
339
+ }
340
+ if len (endpoints .Subsets ) == 0 {
341
+ Failf ("Endpoint has no subsets, cannot determine node addresses." )
342
+ }
343
+ epNodes := sets .NewString ()
344
+ for _ , ss := range endpoints .Subsets {
345
+ for _ , e := range ss .Addresses {
346
+ if e .NodeName != nil {
347
+ epNodes .Insert (* e .NodeName )
348
+ }
349
+ }
350
+ }
351
+ nodeMap := map [string ][]string {}
352
+ for _ , n := range nodes .Items {
353
+ if epNodes .Has (n .Name ) {
354
+ nodeMap [n .Name ] = GetNodeAddresses (& n , v1 .NodeExternalIP )
355
+ }
356
+ }
357
+ return nodeMap
358
+ }
359
+
360
+ // GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
361
+ // where we don't eg: want to create an endpoint per node.
362
+ func (j * ServiceTestJig ) GetNodes (maxNodesForTest int ) (nodes * v1.NodeList ) {
363
+ nodes = GetReadySchedulableNodesOrDie (j .Client )
364
+ if len (nodes .Items ) <= maxNodesForTest {
365
+ maxNodesForTest = len (nodes .Items )
366
+ }
367
+ nodes .Items = nodes .Items [:maxNodesForTest ]
368
+ return nodes
369
+ }
370
+
371
+ func (j * ServiceTestJig ) WaitForEndpointOnNode (namespace , serviceName , nodeName string ) {
372
+ err := wait .PollImmediate (Poll , LoadBalancerCreateTimeoutDefault , func () (bool , error ) {
373
+ endpoints , err := j .Client .CoreV1 ().Endpoints (namespace ).Get (serviceName , metav1.GetOptions {})
374
+ if err != nil {
375
+ Logf ("Get endpoints for service %s/%s failed (%s)" , namespace , serviceName , err )
376
+ return false , nil
377
+ }
378
+ if len (endpoints .Subsets ) == 0 {
379
+ Logf ("Expect endpoints with subsets, got none." )
380
+ return false , nil
381
+ }
382
+ // TODO: Handle multiple endpoints
383
+ if len (endpoints .Subsets [0 ].Addresses ) == 0 {
384
+ Logf ("Expected Ready endpoints - found none" )
385
+ return false , nil
386
+ }
387
+ epHostName := * endpoints .Subsets [0 ].Addresses [0 ].NodeName
388
+ Logf ("Pod for service %s/%s is on node %s" , namespace , serviceName , epHostName )
389
+ if epHostName != nodeName {
390
+ Logf ("Found endpoint on wrong node, expected %v, got %v" , nodeName , epHostName )
391
+ return false , nil
392
+ }
393
+ return true , nil
394
+ })
395
+ ExpectNoError (err )
396
+ }
397
+
331
398
func (j * ServiceTestJig ) SanityCheckService (svc * v1.Service , svcType v1.ServiceType ) {
332
399
if svc .Spec .Type != svcType {
333
400
Failf ("unexpected Spec.Type (%s) for service, expected %s" , svc .Spec .Type , svcType )
0 commit comments