Skip to content

Commit 89cca5f

Browse files
committed
Implement the node side of automatic egress IP support
1 parent 02c0753 commit 89cca5f

7 files changed

+657
-40
lines changed

pkg/network/node/egressip.go

+300
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
package node
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
"syscall"
8+
9+
"github.com/golang/glog"
10+
11+
"k8s.io/apimachinery/pkg/util/sets"
12+
utilwait "k8s.io/apimachinery/pkg/util/wait"
13+
"k8s.io/client-go/tools/cache"
14+
15+
networkapi "github.com/openshift/origin/pkg/network/apis/network"
16+
"github.com/openshift/origin/pkg/network/common"
17+
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
18+
19+
"github.com/vishvananda/netlink"
20+
)
21+
22+
type nodeEgress struct {
23+
nodeIP string
24+
egressIPs sets.String
25+
}
26+
27+
type namespaceEgress struct {
28+
vnid uint32
29+
30+
// claimedIP is the egress IP it wants (NetNamespace.EgressIP[0]), or "" for none
31+
claimedIP string
32+
// assignedIP is an egress IP actually in use on nodeIP
33+
assignedIP string
34+
nodeIP string
35+
}
36+
37+
type egressIPWatcher struct {
38+
sync.Mutex
39+
40+
localIP string
41+
oc *ovsController
42+
43+
networkClient networkclient.Interface
44+
iptables *NodeIPTables
45+
46+
// from HostSubnets
47+
nodesByNodeIP map[string]*nodeEgress
48+
nodesByEgressIP map[string]*nodeEgress
49+
50+
// From NetNamespaces
51+
namespacesByVNID map[uint32]*namespaceEgress
52+
namespacesByEgressIP map[string]*namespaceEgress
53+
54+
localEgressLink netlink.Link
55+
localEgressIPMaskLen int
56+
57+
testModeChan chan string
58+
}
59+
60+
func newEgressIPWatcher(localIP string, oc *ovsController) *egressIPWatcher {
61+
return &egressIPWatcher{
62+
localIP: localIP,
63+
oc: oc,
64+
65+
nodesByNodeIP: make(map[string]*nodeEgress),
66+
nodesByEgressIP: make(map[string]*nodeEgress),
67+
68+
namespacesByVNID: make(map[uint32]*namespaceEgress),
69+
namespacesByEgressIP: make(map[string]*namespaceEgress),
70+
}
71+
}
72+
73+
func (eip *egressIPWatcher) Start(networkClient networkclient.Interface, iptables *NodeIPTables) error {
74+
eip.iptables = iptables
75+
eip.networkClient = networkClient
76+
77+
go utilwait.Forever(eip.watchHostSubnets, 0)
78+
go utilwait.Forever(eip.watchNetNamespaces, 0)
79+
return nil
80+
}
81+
82+
func ipToHex(ip string) string {
83+
bytes := net.ParseIP(ip)
84+
if bytes == nil {
85+
return "invalid IP: shouldn't happen"
86+
}
87+
bytes = bytes.To4()
88+
return fmt.Sprintf("0x%02x%02x%02x%02x", bytes[0], bytes[1], bytes[2], bytes[3])
89+
}
90+
91+
func (eip *egressIPWatcher) watchHostSubnets() {
92+
common.RunEventQueue(eip.networkClient.Network().RESTClient(), common.HostSubnets, func(delta cache.Delta) error {
93+
hs := delta.Object.(*networkapi.HostSubnet)
94+
95+
var egressIPs []string
96+
if delta.Type != cache.Deleted {
97+
egressIPs = hs.EgressIPs
98+
}
99+
100+
eip.updateNode(hs.HostIP, egressIPs)
101+
return nil
102+
})
103+
}
104+
105+
func (eip *egressIPWatcher) updateNode(nodeIP string, nodeEgressIPs []string) {
106+
eip.Lock()
107+
defer eip.Unlock()
108+
109+
node := eip.nodesByNodeIP[nodeIP]
110+
if node == nil {
111+
if len(nodeEgressIPs) == 0 {
112+
return
113+
}
114+
node = &nodeEgress{nodeIP: nodeIP, egressIPs: sets.NewString()}
115+
eip.nodesByNodeIP[nodeIP] = node
116+
} else if len(nodeEgressIPs) == 0 {
117+
delete(eip.nodesByNodeIP, nodeIP)
118+
}
119+
oldEgressIPs := node.egressIPs
120+
node.egressIPs = sets.NewString(nodeEgressIPs...)
121+
122+
// Process new EgressIPs
123+
for _, ip := range node.egressIPs.Difference(oldEgressIPs).UnsortedList() {
124+
eip.nodesByEgressIP[ip] = node
125+
hex := ipToHex(ip)
126+
127+
if nodeIP == eip.localIP {
128+
if err := eip.claimEgressIP(ip, hex); err != nil {
129+
glog.Errorf("Error claiming Egress IP %q: %v", ip, err)
130+
}
131+
}
132+
133+
if ns, exists := eip.namespacesByEgressIP[ip]; exists {
134+
if ns.assignedIP == "" {
135+
ns.assignedIP = ip
136+
ns.nodeIP = nodeIP
137+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, ns.nodeIP, hex)
138+
if err != nil {
139+
glog.Errorf("Error updating Namespace egress rules: %v", err)
140+
}
141+
}
142+
}
143+
}
144+
145+
// Process removed EgressIPs
146+
for _, ip := range oldEgressIPs.Difference(node.egressIPs).UnsortedList() {
147+
delete(eip.nodesByEgressIP, ip)
148+
hex := ipToHex(ip)
149+
150+
if nodeIP == eip.localIP {
151+
if err := eip.releaseEgressIP(ip, hex); err != nil {
152+
glog.Errorf("Error releasing Egress IP %q: %v", ip, err)
153+
}
154+
}
155+
156+
if ns, exists := eip.namespacesByEgressIP[ip]; exists {
157+
if ns.assignedIP == ip {
158+
ns.assignedIP = ""
159+
ns.nodeIP = ""
160+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, "", hex)
161+
if err != nil {
162+
glog.Errorf("Error updating Namespace egress rules: %v", err)
163+
}
164+
}
165+
}
166+
}
167+
}
168+
169+
func (eip *egressIPWatcher) watchNetNamespaces() {
170+
common.RunEventQueue(eip.networkClient.Network().RESTClient(), common.NetNamespaces, func(delta cache.Delta) error {
171+
netns := delta.Object.(*networkapi.NetNamespace)
172+
173+
var egressIP string
174+
if delta.Type != cache.Deleted && len(netns.EgressIPs) != 0 {
175+
egressIP = netns.EgressIPs[0]
176+
}
177+
178+
eip.updateNamespace(netns.NetID, egressIP)
179+
return nil
180+
})
181+
}
182+
183+
func (eip *egressIPWatcher) updateNamespace(vnid uint32, egressIP string) {
184+
eip.Lock()
185+
defer eip.Unlock()
186+
187+
ns := eip.namespacesByVNID[vnid]
188+
if ns == nil {
189+
if egressIP == "" {
190+
return
191+
}
192+
ns = &namespaceEgress{vnid: vnid}
193+
eip.namespacesByVNID[vnid] = ns
194+
}
195+
if ns.claimedIP == egressIP {
196+
return
197+
}
198+
199+
if ns.claimedIP != "" {
200+
delete(eip.namespacesByEgressIP, ns.claimedIP)
201+
ns.assignedIP = ""
202+
ns.nodeIP = ""
203+
}
204+
ns.claimedIP = egressIP
205+
eip.namespacesByEgressIP[egressIP] = ns
206+
if node := eip.nodesByEgressIP[egressIP]; node != nil {
207+
ns.assignedIP = egressIP
208+
ns.nodeIP = node.nodeIP
209+
}
210+
211+
egressHex := ""
212+
if egressIP != "" {
213+
egressHex = ipToHex(egressIP)
214+
}
215+
216+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, ns.nodeIP, egressHex)
217+
if err != nil {
218+
glog.Errorf("Error updating Namespace egress rules: %v", err)
219+
}
220+
}
221+
222+
func (eip *egressIPWatcher) claimEgressIP(egressIP, egressHex string) error {
223+
if eip.testModeChan != nil {
224+
eip.testModeChan <- fmt.Sprintf("claim %s", egressIP)
225+
return nil
226+
}
227+
228+
if eip.localEgressLink == nil {
229+
links, err := netlink.LinkList()
230+
if err != nil {
231+
return fmt.Errorf("could not get list of network interfaces while adding egress IP: %v", err)
232+
}
233+
linkLoop:
234+
for _, link := range links {
235+
addrs, err := netlink.AddrList(link, syscall.AF_INET)
236+
if err != nil {
237+
return fmt.Errorf("could not get addresses of interface %q while adding egress IP: %v", link.Attrs().Name, err)
238+
}
239+
240+
for _, addr := range addrs {
241+
if addr.IP.String() == eip.localIP {
242+
eip.localEgressLink = link
243+
eip.localEgressIPMaskLen, _ = addr.Mask.Size()
244+
break linkLoop
245+
}
246+
}
247+
}
248+
249+
if eip.localEgressLink == nil {
250+
return fmt.Errorf("could not find network interface with the address %q while adding egress IP", eip.localIP)
251+
}
252+
}
253+
254+
egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen)
255+
addr, err := netlink.ParseAddr(egressIPNet)
256+
if err != nil {
257+
return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err)
258+
}
259+
err = netlink.AddrAdd(eip.localEgressLink, addr)
260+
if err != nil {
261+
return fmt.Errorf("could not add egress IP %q to %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err)
262+
}
263+
264+
if err := eip.iptables.AddEgressIPRules(egressIP, egressHex); err != nil {
265+
return fmt.Errorf("could not add egress IP iptables rule: %v", err)
266+
}
267+
268+
return nil
269+
}
270+
271+
func (eip *egressIPWatcher) releaseEgressIP(egressIP, egressHex string) error {
272+
if eip.testModeChan != nil {
273+
eip.testModeChan <- fmt.Sprintf("release %s", egressIP)
274+
return nil
275+
}
276+
277+
if eip.localEgressLink == nil {
278+
return nil
279+
}
280+
281+
egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen)
282+
addr, err := netlink.ParseAddr(egressIPNet)
283+
if err != nil {
284+
return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err)
285+
}
286+
err = netlink.AddrDel(eip.localEgressLink, addr)
287+
if err != nil {
288+
if err == syscall.EADDRNOTAVAIL {
289+
glog.V(2).Infof("Could not delete egress IP %q from %s: no such address", egressIPNet, eip.localEgressLink.Attrs().Name)
290+
} else {
291+
return fmt.Errorf("could not delete egress IP %q from %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err)
292+
}
293+
}
294+
295+
if err := eip.iptables.DeleteEgressIPRules(egressIP, egressHex); err != nil {
296+
return fmt.Errorf("could not delete egress IP iptables rule: %v", err)
297+
}
298+
299+
return nil
300+
}

0 commit comments

Comments
 (0)