Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c90a4b1

Browse files
authoredMar 26, 2025
Merge pull request kubernetes#130945 from danwinship/prefersame-e2e-test
e2e testing for PreferSameZone/PreferSameNode
2 parents ef838fc + 478a6f9 commit c90a4b1

File tree

3 files changed

+353
-159
lines changed

3 files changed

+353
-159
lines changed
 

‎test/e2e/feature/feature.go

-8
Original file line numberDiff line numberDiff line change
@@ -523,14 +523,6 @@ var (
523523
// - ci-kubernetes-node-e2e-cri-proxy-serial
524524
CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy"))
525525

526-
// Owner: sig-network
527-
// Marks tests that require a cluster with Topology Hints enabled.
528-
TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints"))
529-
530-
// Owner: sig-network
531-
// Marks tests that require a cluster with Traffic Distribution enabled.
532-
TrafficDistribution = framework.WithFeature(framework.ValidFeatures.Add("Traffic Distribution"))
533-
534526
// TODO: document the feature (owning SIG, when to use this feature for a test)
535527
TopologyManager = framework.WithFeature(framework.ValidFeatures.Add("TopologyManager"))
536528

‎test/e2e/network/topology_hints.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"k8s.io/apimachinery/pkg/util/intstr"
3131
"k8s.io/apimachinery/pkg/util/wait"
3232
clientset "k8s.io/client-go/kubernetes"
33-
"k8s.io/kubernetes/test/e2e/feature"
3433
"k8s.io/kubernetes/test/e2e/framework"
3534
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
3635
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -41,7 +40,7 @@ import (
4140
admissionapi "k8s.io/pod-security-admission/api"
4241
)
4342

44-
var _ = common.SIGDescribe(feature.TopologyHints, func() {
43+
var _ = common.SIGDescribe("Topology Hints", func() {
4544
f := framework.NewDefaultFramework("topology-hints")
4645
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
4746

‎test/e2e/network/traffic_distribution.go

+352-149
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import (
2727
discoveryv1 "k8s.io/api/discovery/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/util/intstr"
30+
"k8s.io/apimachinery/pkg/util/sets"
3031
clientset "k8s.io/client-go/kubernetes"
3132
"k8s.io/kubernetes/pkg/features"
32-
"k8s.io/kubernetes/test/e2e/feature"
3333
"k8s.io/kubernetes/test/e2e/framework"
3434
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
3535
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -43,7 +43,7 @@ import (
4343
"github.com/onsi/gomega"
4444
)
4545

46-
var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGate(features.ServiceTrafficDistribution), func() {
46+
var _ = common.SIGDescribe("Traffic Distribution", func() {
4747
f := framework.NewDefaultFramework("traffic-distribution")
4848
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
4949

@@ -79,31 +79,6 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat
7979
}
8080
}
8181

82-
// endpointSlicesHaveSameZoneHints returns a matcher function to be used with
83-
// gomega.Eventually().Should(...). It checks that the passed EndpointSlices
84-
// have zone-hints which match the endpoint's zone.
85-
endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) {
86-
if len(slices) == 0 {
87-
return nil, fmt.Errorf("no endpointslices found")
88-
}
89-
for _, slice := range slices {
90-
for _, endpoint := range slice.Endpoints {
91-
var ip string
92-
if len(endpoint.Addresses) > 0 {
93-
ip = endpoint.Addresses[0]
94-
}
95-
var zone string
96-
if endpoint.Zone != nil {
97-
zone = *endpoint.Zone
98-
}
99-
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
100-
return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil
101-
}
102-
}
103-
}
104-
return nil, nil
105-
})
106-
10782
// requestsFromClient returns a helper function to be used with
10883
// gomega.Eventually(...). It fetches the logs from the clientPod and returns
10984
// them in reverse-chronological order.
@@ -119,164 +94,392 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat
11994
}
12095
}
12196

122-
////////////////////////////////////////////////////////////////////////////
123-
// Main test specifications.
124-
////////////////////////////////////////////////////////////////////////////
125-
126-
ginkgo.When("Service has trafficDistribution=PreferClose", func() {
127-
ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
97+
// getNodesForMultiNode returns a set of nodes for a test case with 3 zones with 2
98+
// nodes each. If there are not suitable nodes/zones, the test is skipped.
99+
getNodesForMultiNode := func(ctx context.Context) ([]*v1.Node, []*v1.Node, []*v1.Node) {
100+
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
101+
framework.ExpectNoError(err)
102+
nodesForZone := make(map[string][]*v1.Node)
103+
for _, node := range nodeList.Items {
104+
zone := node.Labels[v1.LabelTopologyZone]
105+
nodesForZone[zone] = append(nodesForZone[zone], &node)
106+
}
107+
if len(nodesForZone) < 3 {
108+
e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each")
109+
}
128110

129-
ginkgo.By("finding 3 zones with schedulable nodes")
130-
allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
131-
framework.ExpectNoError(err)
132-
if len(allZonesSet) < 3 {
133-
framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
111+
var multiNodeZones [][]*v1.Node
112+
for _, nodes := range nodesForZone {
113+
if len(nodes) > 1 {
114+
multiNodeZones = append(multiNodeZones, nodes)
134115
}
135-
zones := allZonesSet.UnsortedList()[:3]
136-
137-
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
138-
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
139-
framework.ExpectNoError(err)
140-
nodeForZone := make(map[string]string)
141-
for _, zone := range zones {
142-
found := false
143-
for _, node := range nodeList.Items {
144-
if zone == node.Labels[v1.LabelTopologyZone] {
145-
found = true
146-
nodeForZone[zone] = node.GetName()
147-
}
148-
}
149-
if !found {
150-
framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
151-
}
116+
if len(multiNodeZones) == 3 {
117+
break
152118
}
119+
}
120+
if len(multiNodeZones) < 3 {
121+
e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each")
122+
}
153123

154-
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
155-
zoneForServingPod := make(map[string]string)
156-
var servingPods []*v1.Pod
157-
servingPodLabels := map[string]string{"app": f.UniqueName}
158-
for _, zone := range zones[:2] {
159-
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
160-
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
161-
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
162-
pod.Labels = servingPodLabels
163-
164-
servingPods = append(servingPods, pod)
165-
zoneForServingPod[pod.Name] = zone
166-
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
124+
return multiNodeZones[0], multiNodeZones[1], multiNodeZones[2]
125+
}
126+
127+
// Data structures for tracking server and client pods
128+
type serverPod struct {
129+
node *v1.Node
130+
pod *v1.Pod
131+
}
132+
133+
type clientPod struct {
134+
node *v1.Node
135+
endpoints []*serverPod
136+
pod *v1.Pod
137+
}
138+
139+
// allocateClientsAndServers figures out where to put clients and servers for
140+
// a simple "same-zone" traffic distribution test.
141+
allocateClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) {
142+
ginkgo.By("finding 3 zones with schedulable nodes")
143+
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
144+
framework.ExpectNoError(err)
145+
nodeForZone := make(map[string]*v1.Node)
146+
for _, node := range nodeList.Items {
147+
zone := node.Labels[v1.LabelTopologyZone]
148+
if nodeForZone[zone] != nil {
149+
continue
150+
}
151+
nodeForZone[zone] = &node
152+
if len(nodeForZone) == 3 {
153+
break
167154
}
168-
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
155+
}
156+
if len(nodeForZone) < 3 {
157+
e2eskipper.Skipf("got %d zones with schedulable nodes, need at least 3", len(nodeForZone))
158+
}
169159

170-
trafficDist := v1.ServiceTrafficDistributionPreferClose
171-
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
172-
ObjectMeta: metav1.ObjectMeta{
173-
Name: "traffic-dist-test-service",
174-
},
175-
Spec: v1.ServiceSpec{
176-
Selector: servingPodLabels,
177-
TrafficDistribution: &trafficDist,
178-
Ports: []v1.ServicePort{{
179-
Port: 80,
180-
TargetPort: intstr.FromInt32(9376),
181-
Protocol: v1.ProtocolTCP,
182-
}},
183-
},
184-
})
185-
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
186-
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{})
160+
var clientPods []*clientPod
161+
var serverPods []*serverPod
187162

188-
ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints")
189-
gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints)
163+
// We want clients in all three zones
164+
for _, node := range nodeForZone {
165+
clientPods = append(clientPods, &clientPod{node: node})
166+
}
190167

191-
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
168+
// and endpoints in the first two zones
169+
serverPods = []*serverPod{
170+
{node: clientPods[0].node},
171+
{node: clientPods[1].node},
172+
}
192173

193-
createClientPod := func(ctx context.Context, zone string) *v1.Pod {
194-
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
195-
pod.Spec.NodeName = nodeForZone[zone]
196-
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
197-
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
198-
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
199-
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
200-
pod.Spec.Containers[0].Name = pod.Name
174+
// The clients with an endpoint in the same zone should only connect to
175+
// that endpoint. The client with no endpoint in its zone should connect
176+
// to both endpoints.
177+
clientPods[0].endpoints = []*serverPod{serverPods[0]}
178+
clientPods[1].endpoints = []*serverPod{serverPods[1]}
179+
clientPods[2].endpoints = serverPods
201180

202-
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
203-
return e2epod.NewPodClient(f).CreateSync(ctx, pod)
204-
}
181+
return clientPods, serverPods
182+
}
205183

206-
for _, clientZone := range zones[:2] {
207-
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
208-
clientPod := createClientPod(ctx, clientZone)
184+
// allocateMultiNodeClientsAndServers figures out where to put clients and servers
185+
// for a "same-zone" traffic distribution test with multiple nodes in each zone.
186+
allocateMultiNodeClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) {
187+
ginkgo.By("finding a set of zones and nodes for the test")
188+
zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx)
209189

210-
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
190+
var clientPods []*clientPod
191+
var serverPods []*serverPod
211192

212-
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
213-
logLines := reverseChronologicalLogLines
214-
if len(logLines) < 20 {
215-
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
216-
}
217-
consecutiveSameZone := 0
218-
219-
for _, logLine := range logLines {
220-
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
221-
continue
222-
}
223-
destZone, ok := zoneForServingPod[logLine]
224-
if !ok {
225-
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
226-
}
227-
if clientZone != destZone {
228-
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
229-
}
230-
consecutiveSameZone++
231-
if consecutiveSameZone >= 10 {
232-
return nil, nil // Pass condition.
233-
}
234-
}
235-
// Ideally, the matcher would never reach this condition
236-
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
237-
})
193+
// First zone: a client and an endpoint on each node, and both clients
194+
// should talk to both endpoints.
195+
endpointsForZone := []*serverPod{
196+
{node: zone1Nodes[0]},
197+
{node: zone1Nodes[1]},
198+
}
238199

239-
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
240-
}
200+
clientPods = append(clientPods,
201+
&clientPod{
202+
node: zone1Nodes[0],
203+
endpoints: endpointsForZone,
204+
},
205+
&clientPod{
206+
node: zone1Nodes[1],
207+
endpoints: endpointsForZone,
208+
},
209+
)
210+
serverPods = append(serverPods, endpointsForZone...)
211+
212+
// Second zone: a client on one node and a server on the other.
213+
endpointsForZone = []*serverPod{
214+
{node: zone2Nodes[1]},
215+
}
241216

242-
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
217+
clientPods = append(clientPods,
218+
&clientPod{
219+
node: zone2Nodes[0],
220+
endpoints: endpointsForZone,
221+
},
222+
)
223+
serverPods = append(serverPods, endpointsForZone...)
224+
225+
// Third zone: just a client, which should connect to the servers in the
226+
// other two zones.
227+
clientPods = append(clientPods,
228+
&clientPod{
229+
node: zone3Nodes[0],
230+
endpoints: serverPods,
231+
},
232+
)
233+
234+
return clientPods, serverPods
235+
}
243236

244-
clientZone := zones[2]
245-
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
246-
clientPod := createClientPod(ctx, clientZone)
237+
// createService creates the service for a traffic distribution test
238+
createService := func(ctx context.Context, trafficDist string) *v1.Service {
239+
serviceName := "traffic-dist-test-service"
240+
ginkgo.By(fmt.Sprintf("creating a service %q with trafficDistribution %q", serviceName, trafficDist))
241+
return createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
242+
ObjectMeta: metav1.ObjectMeta{
243+
Name: serviceName,
244+
},
245+
Spec: v1.ServiceSpec{
246+
Selector: map[string]string{
247+
"app": f.UniqueName,
248+
},
249+
TrafficDistribution: &trafficDist,
250+
Ports: []v1.ServicePort{{
251+
Port: 80,
252+
TargetPort: intstr.FromInt32(9376),
253+
Protocol: v1.ProtocolTCP,
254+
}},
255+
},
256+
})
257+
}
247258

248-
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
259+
// createPods creates endpoint pods for svc as described by serverPods, waits for
260+
// the EndpointSlices to be updated, and creates clientPods as described by
261+
// clientPods.
262+
createPods := func(ctx context.Context, svc *v1.Service, clientPods []*clientPod, serverPods []*serverPod) {
263+
var podsToCreate []*v1.Pod
264+
for i, sp := range serverPods {
265+
node := sp.node.Name
266+
zone := sp.node.Labels[v1.LabelTopologyZone]
267+
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname")
268+
ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone))
269+
nodeSelection := e2epod.NodeSelection{Name: node}
270+
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
271+
pod.Labels = svc.Spec.Selector
272+
273+
sp.pod = pod
274+
podsToCreate = append(podsToCreate, pod)
275+
}
276+
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
277+
278+
ginkgo.By("waiting for EndpointSlices to be created")
279+
err := framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
280+
framework.ExpectNoError(err)
281+
slices := endpointSlicesForService(svc.Name)
282+
framework.Logf("got slices:\n%v", format.Object(slices, 1))
283+
284+
podsToCreate = nil
285+
for i, cp := range clientPods {
286+
node := cp.node.Name
287+
zone := cp.node.Labels[v1.LabelTopologyZone]
288+
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil)
289+
ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone))
290+
nodeSelection := e2epod.NodeSelection{Name: node}
291+
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
292+
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
293+
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
294+
pod.Spec.Containers[0].Name = pod.Name
295+
296+
cp.pod = pod
297+
podsToCreate = append(podsToCreate, pod)
298+
}
299+
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
300+
}
249301

250-
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
302+
// checkTrafficDistribution checks that traffic from clientPods is distributed in
303+
// the expected way.
304+
checkTrafficDistribution := func(ctx context.Context, clientPods []*clientPod) {
305+
for _, cp := range clientPods {
306+
wantedEndpoints := sets.New[string]()
307+
for _, sp := range cp.endpoints {
308+
wantedEndpoints.Insert(sp.pod.Name)
309+
}
310+
unreachedEndpoints := wantedEndpoints.Clone()
311+
312+
ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList()))
313+
314+
requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
251315
logLines := reverseChronologicalLogLines
252316
if len(logLines) < 20 {
253317
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
254318
}
255-
256-
// Requests are counted as successful when the response read from the log
257-
// lines is the name of a recognizable serving pod.
258319
consecutiveSuccessfulRequests := 0
259320

260321
for _, logLine := range logLines {
261322
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
262323
continue
263324
}
264-
_, servingPodExists := zoneForServingPod[logLine]
265-
if !servingPodExists {
266-
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
325+
destEndpoint := logLine
326+
if !wantedEndpoints.Has(destEndpoint) {
327+
return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil
267328
}
268329
consecutiveSuccessfulRequests++
269-
if consecutiveSuccessfulRequests >= 10 {
270-
return nil, nil // Pass condition
330+
unreachedEndpoints.Delete(destEndpoint)
331+
if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 {
332+
return nil, nil // Pass condition.
271333
}
272334
}
273335
// Ideally, the matcher would never reach this condition
274-
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
336+
return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil
275337
})
276338

277-
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
339+
gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed)
340+
}
341+
}
278342

279-
})
343+
////////////////////////////////////////////////////////////////////////////
344+
// Main test specifications.
345+
////////////////////////////////////////////////////////////////////////////
346+
347+
framework.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) {
348+
clientPods, serverPods := allocateClientsAndServers(ctx)
349+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose)
350+
createPods(ctx, svc, clientPods, serverPods)
351+
checkTrafficDistribution(ctx, clientPods)
352+
})
353+
354+
framework.It("should route traffic correctly between pods on multiple nodes when using PreferClose", func(ctx context.Context) {
355+
clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx)
356+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose)
357+
createPods(ctx, svc, clientPods, serverPods)
358+
checkTrafficDistribution(ctx, clientPods)
359+
})
360+
361+
framework.It("should route traffic to an endpoint in the same zone when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
362+
clientPods, serverPods := allocateClientsAndServers(ctx)
363+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone)
364+
createPods(ctx, svc, clientPods, serverPods)
365+
checkTrafficDistribution(ctx, clientPods)
366+
})
367+
368+
framework.It("should route traffic correctly between pods on multiple nodes when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
369+
clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx)
370+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone)
371+
createPods(ctx, svc, clientPods, serverPods)
372+
checkTrafficDistribution(ctx, clientPods)
373+
})
374+
375+
framework.It("should route traffic to an endpoint on the same node or fall back to same zone when using PreferSameNode", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
376+
ginkgo.By("finding a set of nodes for the test")
377+
zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx)
378+
379+
var clientPods []*clientPod
380+
var serverPods []*serverPod
381+
382+
// The first zone: a client and a server on each node. Each client only
383+
// talks to the server on the same node.
384+
endpointsForZone := []*serverPod{
385+
{node: zone1Nodes[0]},
386+
{node: zone1Nodes[1]},
387+
}
388+
clientPods = append(clientPods,
389+
&clientPod{
390+
node: zone1Nodes[0],
391+
endpoints: []*serverPod{endpointsForZone[0]},
392+
},
393+
&clientPod{
394+
node: zone1Nodes[1],
395+
endpoints: []*serverPod{endpointsForZone[1]},
396+
},
397+
)
398+
serverPods = append(serverPods, endpointsForZone...)
399+
400+
// The second zone: a client on one node and a server on the other. The
401+
// client should fall back to connecting (only) to its same-zone endpoint.
402+
endpointsForZone = []*serverPod{
403+
{node: zone2Nodes[1]},
404+
}
405+
clientPods = append(clientPods,
406+
&clientPod{
407+
node: zone2Nodes[0],
408+
endpoints: endpointsForZone,
409+
},
410+
)
411+
serverPods = append(serverPods, endpointsForZone...)
412+
413+
// The third zone: just a client. Since it has neither a same-node nor a
414+
// same-zone endpoint, it should connect to all endpoints.
415+
clientPods = append(clientPods,
416+
&clientPod{
417+
node: zone3Nodes[0],
418+
endpoints: serverPods,
419+
},
420+
)
421+
422+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode)
423+
createPods(ctx, svc, clientPods, serverPods)
424+
checkTrafficDistribution(ctx, clientPods)
425+
})
426+
427+
framework.It("should route traffic to an endpoint on the same node when using PreferSameNode and fall back when the endpoint becomes unavailable", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
428+
ginkgo.By("finding a set of nodes for the test")
429+
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
430+
framework.ExpectNoError(err)
431+
if len(nodeList.Items) < 2 {
432+
e2eskipper.Skipf("have %d schedulable nodes, need at least 2", len(nodeList.Items))
433+
}
434+
nodes := nodeList.Items[:2]
435+
436+
// One client and one server on each node
437+
serverPods := []*serverPod{
438+
{node: &nodes[0]},
439+
{node: &nodes[1]},
440+
}
441+
clientPods := []*clientPod{
442+
{
443+
node: &nodes[0],
444+
endpoints: []*serverPod{serverPods[0]},
445+
},
446+
{
447+
node: &nodes[1],
448+
endpoints: []*serverPod{serverPods[1]},
449+
},
450+
}
280451

452+
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode)
453+
createPods(ctx, svc, clientPods, serverPods)
454+
455+
ginkgo.By("ensuring that each client talks to its same-node endpoint when both endpoints exist")
456+
checkTrafficDistribution(ctx, clientPods)
457+
458+
ginkgo.By("killing the server pod on the first node and waiting for the EndpointSlices to be updated")
459+
err = c.CoreV1().Pods(f.Namespace.Name).Delete(ctx, serverPods[0].pod.Name, metav1.DeleteOptions{})
460+
framework.ExpectNoError(err)
461+
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 1, 1*time.Second, e2eservice.ServiceEndpointsTimeout)
462+
framework.ExpectNoError(err)
463+
464+
ginkgo.By("ensuring that both clients talk to the remaining endpoint when only one endpoint exists")
465+
serverPods[0].pod = nil
466+
clientPods[0].endpoints = []*serverPod{serverPods[1]}
467+
checkTrafficDistribution(ctx, clientPods)
468+
469+
ginkgo.By("recreating the missing server pod and waiting for the EndpointSlices to be updated")
470+
// We can't use createPods() here because if we only tell it about
471+
// serverPods[0] and not serverPods[1] it will expect there to be only one
472+
// endpoint.
473+
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "server-0-new", nil, nil, nil, "serve-hostname")
474+
nodeSelection := e2epod.NodeSelection{Name: serverPods[0].node.Name}
475+
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
476+
pod.Labels = svc.Spec.Selector
477+
serverPods[0].pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
478+
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 2, 1*time.Second, e2eservice.ServiceEndpointsTimeout)
479+
framework.ExpectNoError(err)
480+
481+
ginkgo.By("ensuring that each client talks only to its same-node endpoint again")
482+
clientPods[0].endpoints = []*serverPod{serverPods[0]}
483+
checkTrafficDistribution(ctx, clientPods)
281484
})
282485
})

0 commit comments

Comments
 (0)
Please sign in to comment.