Skip to content

Commit 022e2de

Browse files
committed
Refactor node HostSubnet code into its own object for ease of testing
1 parent cfb31fc commit 022e2de

File tree

5 files changed

+307
-200
lines changed

5 files changed

+307
-200
lines changed

pkg/network/node/node.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ type OsdnNode struct {
114114
host knetwork.Host
115115
kubeletCniPlugin knetwork.NetworkPlugin
116116

117-
hostSubnetMap map[string]*networkapi.HostSubnet
118-
119117
kubeInformers kinternalinformers.SharedInformerFactory
120118
networkInformers networkinformers.SharedInformerFactory
121119

@@ -183,7 +181,6 @@ func New(c *OsdnNodeConfig) (network.NodeInterface, error) {
183181
mtu: c.MTU,
184182
egressPolicies: make(map[uint32][]networkapi.EgressNetworkPolicy),
185183
egressDNS: common.NewEgressDNS(),
186-
hostSubnetMap: make(map[string]*networkapi.HostSubnet),
187184
kubeInformers: c.KubeInformers,
188185
networkInformers: c.NetworkInformers,
189186
egressIP: newEgressIPWatcher(oc, c.SelfIP, c.MasqueradeBit),
@@ -333,7 +330,8 @@ func (node *OsdnNode) Start() error {
333330
return fmt.Errorf("node SDN setup failed: %v", err)
334331
}
335332

336-
node.SubnetStartNode()
333+
hsw := newHostSubnetWatcher(node.oc, node.localIP, node.networkInfo)
334+
hsw.Start(node.networkInformers)
337335

338336
if err = node.policy.Start(node); err != nil {
339337
return err

pkg/network/node/ovscontroller_test.go

+1-119
Original file line numberDiff line numberDiff line change
@@ -103,62 +103,6 @@ func assertFlowChanges(origFlows, newFlows []string, changes ...flowChange) erro
103103
return nil
104104
}
105105

106-
func TestOVSHostSubnet(t *testing.T) {
107-
ovsif, oc, origFlows := setupOVSController(t)
108-
109-
hs := networkapi.HostSubnet{
110-
TypeMeta: metav1.TypeMeta{
111-
Kind: "HostSubnet",
112-
},
113-
ObjectMeta: metav1.ObjectMeta{
114-
Name: "node2",
115-
},
116-
Host: "node2",
117-
HostIP: "192.168.1.2",
118-
Subnet: "10.129.0.0/23",
119-
}
120-
err := oc.AddHostSubnetRules(&hs)
121-
if err != nil {
122-
t.Fatalf("Unexpected error adding HostSubnet rules: %v", err)
123-
}
124-
125-
flows, err := ovsif.DumpFlows("")
126-
if err != nil {
127-
t.Fatalf("Unexpected error dumping flows: %v", err)
128-
}
129-
err = assertFlowChanges(origFlows, flows,
130-
flowChange{
131-
kind: flowAdded,
132-
match: []string{"table=10", "tun_src=192.168.1.2"},
133-
},
134-
flowChange{
135-
kind: flowAdded,
136-
match: []string{"table=50", "arp", "arp_tpa=10.129.0.0/23", "192.168.1.2->tun_dst"},
137-
},
138-
flowChange{
139-
kind: flowAdded,
140-
match: []string{"table=90", "ip", "nw_dst=10.129.0.0/23", "192.168.1.2->tun_dst"},
141-
},
142-
)
143-
if err != nil {
144-
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
145-
}
146-
147-
err = oc.DeleteHostSubnetRules(&hs)
148-
if err != nil {
149-
t.Fatalf("Unexpected error deleting HostSubnet rules: %v", err)
150-
}
151-
flows, err = ovsif.DumpFlows("")
152-
if err != nil {
153-
t.Fatalf("Unexpected error dumping flows: %v", err)
154-
}
155-
err = assertFlowChanges(origFlows, flows) // no changes
156-
157-
if err != nil {
158-
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
159-
}
160-
}
161-
162106
func TestOVSService(t *testing.T) {
163107
ovsif, oc, origFlows := setupOVSController(t)
164108

@@ -361,10 +305,9 @@ func TestGetPodDetails(t *testing.T) {
361305
}
362306
}
363307

364-
func TestOVSMulticast(t *testing.T) {
308+
func TestOVSLocalMulticast(t *testing.T) {
365309
ovsif, oc, origFlows := setupOVSController(t)
366310

367-
// local flows
368311
err := oc.UpdateLocalMulticastFlows(99, true, []int{4, 5, 6})
369312
if err != nil {
370313
t.Fatalf("Unexpected error adding multicast flows: %v", err)
@@ -413,67 +356,6 @@ func TestOVSMulticast(t *testing.T) {
413356
if err != nil {
414357
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
415358
}
416-
417-
// VXLAN
418-
err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.2", "192.168.1.5", "192.168.1.3"})
419-
if err != nil {
420-
t.Fatalf("Unexpected error adding multicast flows: %v", err)
421-
}
422-
flows, err = ovsif.DumpFlows("")
423-
if err != nil {
424-
t.Fatalf("Unexpected error dumping flows: %v", err)
425-
}
426-
err = assertFlowChanges(origFlows, flows,
427-
flowChange{
428-
kind: flowRemoved,
429-
match: []string{"table=111", "goto_table:120"},
430-
noMatch: []string{"->tun_dst"},
431-
},
432-
flowChange{
433-
kind: flowAdded,
434-
match: []string{"table=111", "192.168.1.2->tun_dst", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"},
435-
},
436-
)
437-
if err != nil {
438-
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
439-
}
440-
441-
err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.5", "192.168.1.3"})
442-
if err != nil {
443-
t.Fatalf("Unexpected error adding multicast flows: %v", err)
444-
}
445-
flows, err = ovsif.DumpFlows("")
446-
if err != nil {
447-
t.Fatalf("Unexpected error dumping flows: %v", err)
448-
}
449-
err = assertFlowChanges(origFlows, flows,
450-
flowChange{
451-
kind: flowRemoved,
452-
match: []string{"table=111", "goto_table:120"},
453-
noMatch: []string{"->tun_dst"},
454-
},
455-
flowChange{
456-
kind: flowAdded,
457-
match: []string{"table=111", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"},
458-
noMatch: []string{"192.168.1.2"},
459-
},
460-
)
461-
if err != nil {
462-
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
463-
}
464-
465-
err = oc.UpdateVXLANMulticastFlows([]string{})
466-
if err != nil {
467-
t.Fatalf("Unexpected error adding multicast flows: %v", err)
468-
}
469-
flows, err = ovsif.DumpFlows("")
470-
if err != nil {
471-
t.Fatalf("Unexpected error dumping flows: %v", err)
472-
}
473-
err = assertFlowChanges(origFlows, flows) // no changes
474-
if err != nil {
475-
t.Fatalf("Unexpected flow changes: %v\nOrig: %#v\nNew: %#v", err, origFlows, flows)
476-
}
477359
}
478360

479361
var enp1 = networkapi.EgressNetworkPolicy{

pkg/network/node/sdn_controller.go

-16
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010

1111
"github.com/golang/glog"
1212

13-
networkapi "github.com/openshift/origin/pkg/network/apis/network"
14-
"github.com/openshift/origin/pkg/network/common"
1513
"github.com/openshift/origin/pkg/util/netutils"
1614

1715
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -208,20 +206,6 @@ func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) {
208206
}
209207
}
210208

211-
func (plugin *OsdnNode) AddHostSubnetRules(subnet *networkapi.HostSubnet) {
212-
glog.Infof("AddHostSubnetRules for %s", common.HostSubnetToString(subnet))
213-
if err := plugin.oc.AddHostSubnetRules(subnet); err != nil {
214-
utilruntime.HandleError(fmt.Errorf("Error adding OVS flows for subnet %q: %v", subnet.Subnet, err))
215-
}
216-
}
217-
218-
func (plugin *OsdnNode) DeleteHostSubnetRules(subnet *networkapi.HostSubnet) {
219-
glog.Infof("DeleteHostSubnetRules for %s", common.HostSubnetToString(subnet))
220-
if err := plugin.oc.DeleteHostSubnetRules(subnet); err != nil {
221-
utilruntime.HandleError(fmt.Errorf("Error deleting OVS flows for subnet %q: %v", subnet.Subnet, err))
222-
}
223-
}
224-
225209
func (plugin *OsdnNode) AddServiceRules(service *kapi.Service, netID uint32) {
226210
glog.V(5).Infof("AddServiceRules for %v", service)
227211
if err := plugin.oc.AddServiceRules(service, netID); err != nil {

pkg/network/node/subnets.go

+102-61
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,118 @@ import (
1010

1111
kapierrors "k8s.io/apimachinery/pkg/api/errors"
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
ktypes "k8s.io/apimachinery/pkg/types"
14+
kerrors "k8s.io/apimachinery/pkg/util/errors"
1315
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1416
utilwait "k8s.io/apimachinery/pkg/util/wait"
1517
"k8s.io/apimachinery/pkg/watch"
1618

1719
networkapi "github.com/openshift/origin/pkg/network/apis/network"
1820
"github.com/openshift/origin/pkg/network/common"
21+
networkinformers "github.com/openshift/origin/pkg/network/generated/informers/internalversion"
1922
)
2023

21-
func (node *OsdnNode) SubnetStartNode() {
22-
node.watchSubnets()
24+
type hostSubnetWatcher struct {
25+
oc *ovsController
26+
localIP string
27+
networkInfo *common.NetworkInfo
28+
29+
hostSubnetMap map[ktypes.UID]*networkapi.HostSubnet
30+
}
31+
32+
func newHostSubnetWatcher(oc *ovsController, localIP string, networkInfo *common.NetworkInfo) *hostSubnetWatcher {
33+
return &hostSubnetWatcher{
34+
oc: oc,
35+
localIP: localIP,
36+
networkInfo: networkInfo,
37+
38+
hostSubnetMap: make(map[ktypes.UID]*networkapi.HostSubnet),
39+
}
40+
}
41+
42+
func (hsw *hostSubnetWatcher) Start(networkInformers networkinformers.SharedInformerFactory) {
43+
funcs := common.InformerFuncs(&networkapi.HostSubnet{}, hsw.handleAddOrUpdateHostSubnet, hsw.handleDeleteHostSubnet)
44+
networkInformers.Network().InternalVersion().HostSubnets().Informer().AddEventHandler(funcs)
45+
}
46+
47+
func (hsw *hostSubnetWatcher) handleAddOrUpdateHostSubnet(obj, _ interface{}, eventType watch.EventType) {
48+
hs := obj.(*networkapi.HostSubnet)
49+
glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name)
50+
51+
if err := hsw.updateHostSubnet(hs); err != nil {
52+
utilruntime.HandleError(err)
53+
}
54+
}
55+
56+
func (hsw *hostSubnetWatcher) handleDeleteHostSubnet(obj interface{}) {
57+
hs := obj.(*networkapi.HostSubnet)
58+
glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name)
59+
60+
if err := hsw.deleteHostSubnet(hs); err != nil {
61+
utilruntime.HandleError(err)
62+
}
2363
}
2464

25-
func (node *OsdnNode) watchSubnets() {
26-
funcs := common.InformerFuncs(&networkapi.HostSubnet{}, node.handleAddOrUpdateHostSubnet, node.handleDeleteHostSubnet)
27-
node.networkInformers.Network().InternalVersion().HostSubnets().Informer().AddEventHandler(funcs)
65+
func (hsw *hostSubnetWatcher) updateHostSubnet(hs *networkapi.HostSubnet) error {
66+
if hs.HostIP == hsw.localIP {
67+
return nil
68+
}
69+
oldSubnet, exists := hsw.hostSubnetMap[hs.UID]
70+
if exists {
71+
if oldSubnet.HostIP == hs.HostIP {
72+
return nil
73+
} else {
74+
// Delete old subnet rules (ignore errors)
75+
hsw.oc.DeleteHostSubnetRules(oldSubnet)
76+
}
77+
}
78+
if err := hsw.networkInfo.ValidateNodeIP(hs.HostIP); err != nil {
79+
return fmt.Errorf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err)
80+
}
81+
82+
hsw.hostSubnetMap[hs.UID] = hs
83+
84+
errList := []error{}
85+
if err := hsw.oc.AddHostSubnetRules(hs); err != nil {
86+
errList = append(errList, fmt.Errorf("error adding OVS flows for subnet %q: %v", hs.Subnet, err))
87+
}
88+
// Update multicast rules after all other changes have been processed
89+
if err := hsw.updateVXLANMulticastRules(); err != nil {
90+
errList = append(errList, fmt.Errorf("error updating OVS VXLAN multicast flows: %v", err))
91+
}
92+
93+
return kerrors.NewAggregate(errList)
94+
}
95+
96+
func (hsw *hostSubnetWatcher) deleteHostSubnet(hs *networkapi.HostSubnet) error {
97+
if hs.HostIP == hsw.localIP {
98+
return nil
99+
}
100+
if _, exists := hsw.hostSubnetMap[hs.UID]; !exists {
101+
return nil
102+
}
103+
104+
delete(hsw.hostSubnetMap, hs.UID)
105+
106+
errList := []error{}
107+
if err := hsw.oc.DeleteHostSubnetRules(hs); err != nil {
108+
errList = append(errList, fmt.Errorf("error deleting OVS flows for subnet %q: %v", hs.Subnet, err))
109+
}
110+
if err := hsw.updateVXLANMulticastRules(); err != nil {
111+
errList = append(errList, fmt.Errorf("error updating OVS VXLAN multicast flows: %v", err))
112+
}
113+
114+
return kerrors.NewAggregate(errList)
115+
}
116+
117+
func (hsw *hostSubnetWatcher) updateVXLANMulticastRules() error {
118+
remoteIPs := make([]string, 0, len(hsw.hostSubnetMap))
119+
for _, subnet := range hsw.hostSubnetMap {
120+
if subnet.HostIP != hsw.localIP {
121+
remoteIPs = append(remoteIPs, subnet.HostIP)
122+
}
123+
}
124+
return hsw.oc.UpdateVXLANMulticastFlows(remoteIPs)
28125
}
29126

30127
func (node *OsdnNode) getLocalSubnet() (string, error) {
@@ -68,59 +165,3 @@ func (node *OsdnNode) getLocalSubnet() (string, error) {
68165

69166
return subnet.Subnet, nil
70167
}
71-
72-
func (node *OsdnNode) handleAddOrUpdateHostSubnet(obj, _ interface{}, eventType watch.EventType) {
73-
hs := obj.(*networkapi.HostSubnet)
74-
glog.V(5).Infof("Watch %s event for HostSubnet %q", eventType, hs.Name)
75-
76-
if hs.HostIP == node.localIP {
77-
return
78-
}
79-
oldSubnet, exists := node.hostSubnetMap[string(hs.UID)]
80-
if exists {
81-
if oldSubnet.HostIP == hs.HostIP {
82-
return
83-
} else {
84-
// Delete old subnet rules
85-
node.DeleteHostSubnetRules(oldSubnet)
86-
}
87-
}
88-
if err := node.networkInfo.ValidateNodeIP(hs.HostIP); err != nil {
89-
glog.Warningf("Ignoring invalid subnet for node %s: %v", hs.HostIP, err)
90-
return
91-
}
92-
node.AddHostSubnetRules(hs)
93-
node.hostSubnetMap[string(hs.UID)] = hs
94-
95-
// Update multicast rules after all other changes have been processed
96-
node.updateVXLANMulticastRules()
97-
}
98-
99-
func (node *OsdnNode) handleDeleteHostSubnet(obj interface{}) {
100-
hs := obj.(*networkapi.HostSubnet)
101-
glog.V(5).Infof("Watch %s event for HostSubnet %q", watch.Deleted, hs.Name)
102-
103-
if hs.HostIP == node.localIP {
104-
return
105-
}
106-
if _, exists := node.hostSubnetMap[string(hs.UID)]; !exists {
107-
return
108-
}
109-
110-
delete(node.hostSubnetMap, string(hs.UID))
111-
node.DeleteHostSubnetRules(hs)
112-
113-
node.updateVXLANMulticastRules()
114-
}
115-
116-
func (node *OsdnNode) updateVXLANMulticastRules() {
117-
remoteIPs := make([]string, 0, len(node.hostSubnetMap))
118-
for _, subnet := range node.hostSubnetMap {
119-
if subnet.HostIP != node.localIP {
120-
remoteIPs = append(remoteIPs, subnet.HostIP)
121-
}
122-
}
123-
if err := node.oc.UpdateVXLANMulticastFlows(remoteIPs); err != nil {
124-
utilruntime.HandleError(fmt.Errorf("Error updating OVS VXLAN multicast flows: %v", err))
125-
}
126-
}

0 commit comments

Comments
 (0)