Skip to content

Commit b398614

Browse files
committed
Implement the node side of automatic egress IP support
1 parent 02c0753 commit b398614

7 files changed

+659
-40
lines changed

pkg/network/node/egressip.go

+302
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
package node
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
"syscall"
8+
9+
"github.com/golang/glog"
10+
11+
"k8s.io/apimachinery/pkg/util/sets"
12+
utilwait "k8s.io/apimachinery/pkg/util/wait"
13+
"k8s.io/client-go/tools/cache"
14+
15+
networkapi "github.com/openshift/origin/pkg/network/apis/network"
16+
"github.com/openshift/origin/pkg/network/common"
17+
networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset"
18+
19+
"github.com/vishvananda/netlink"
20+
)
21+
22+
type nodeEgress struct {
23+
nodeIP string
24+
egressIPs sets.String
25+
}
26+
27+
type namespaceEgress struct {
28+
vnid uint32
29+
30+
// claimedIP is the egress IP it wants (NetNamespace.EgressIP[0]), or "" for none
31+
claimedIP string
32+
// assignedIP is an egress IP actually in use on nodeIP
33+
assignedIP string
34+
nodeIP string
35+
}
36+
37+
type egressIPWatcher struct {
38+
sync.Mutex
39+
40+
localIP string
41+
oc *ovsController
42+
43+
networkClient networkclient.Interface
44+
iptables *NodeIPTables
45+
46+
// from HostSubnets
47+
nodesByNodeIP map[string]*nodeEgress
48+
nodesByEgressIP map[string]*nodeEgress
49+
50+
// From NetNamespaces
51+
namespacesByVNID map[uint32]*namespaceEgress
52+
namespacesByEgressIP map[string]*namespaceEgress
53+
54+
localEgressLink netlink.Link
55+
localEgressIPMaskLen int
56+
57+
testModeChan chan string
58+
}
59+
60+
func newEgressIPWatcher(localIP string, oc *ovsController) *egressIPWatcher {
61+
return &egressIPWatcher{
62+
localIP: localIP,
63+
oc: oc,
64+
65+
nodesByNodeIP: make(map[string]*nodeEgress),
66+
nodesByEgressIP: make(map[string]*nodeEgress),
67+
68+
namespacesByVNID: make(map[uint32]*namespaceEgress),
69+
namespacesByEgressIP: make(map[string]*namespaceEgress),
70+
}
71+
}
72+
73+
func (eip *egressIPWatcher) Start(networkClient networkclient.Interface, iptables *NodeIPTables) error {
74+
eip.iptables = iptables
75+
eip.networkClient = networkClient
76+
77+
go utilwait.Forever(eip.watchHostSubnets, 0)
78+
go utilwait.Forever(eip.watchNetNamespaces, 0)
79+
return nil
80+
}
81+
82+
func ipToHex(ip string) string {
83+
bytes := net.ParseIP(ip)
84+
if bytes == nil {
85+
return "invalid IP: shouldn't happen"
86+
}
87+
bytes = bytes.To4()
88+
return fmt.Sprintf("0x%02x%02x%02x%02x", bytes[0], bytes[1], bytes[2], bytes[3])
89+
}
90+
91+
func (eip *egressIPWatcher) watchHostSubnets() {
92+
common.RunEventQueue(eip.networkClient.Network().RESTClient(), common.HostSubnets, func(delta cache.Delta) error {
93+
hs := delta.Object.(*networkapi.HostSubnet)
94+
95+
var egressIPs []string
96+
if delta.Type != cache.Deleted {
97+
egressIPs = hs.EgressIPs
98+
}
99+
100+
eip.updateNode(hs.HostIP, egressIPs)
101+
return nil
102+
})
103+
}
104+
105+
func (eip *egressIPWatcher) updateNode(nodeIP string, nodeEgressIPs []string) {
106+
eip.Lock()
107+
defer eip.Unlock()
108+
109+
node := eip.nodesByNodeIP[nodeIP]
110+
if node == nil {
111+
if len(nodeEgressIPs) == 0 {
112+
return
113+
}
114+
node = &nodeEgress{nodeIP: nodeIP, egressIPs: sets.NewString()}
115+
eip.nodesByNodeIP[nodeIP] = node
116+
} else if len(nodeEgressIPs) == 0 {
117+
delete(eip.nodesByNodeIP, nodeIP)
118+
}
119+
oldEgressIPs := node.egressIPs
120+
node.egressIPs = sets.NewString(nodeEgressIPs...)
121+
122+
// Process new EgressIPs
123+
for _, ip := range node.egressIPs.Difference(oldEgressIPs).UnsortedList() {
124+
eip.nodesByEgressIP[ip] = node
125+
hex := ipToHex(ip)
126+
claimedNodeIP := nodeIP
127+
128+
if nodeIP == eip.localIP {
129+
if err := eip.claimEgressIP(ip, hex); err != nil {
130+
glog.Errorf("Error claiming Egress IP %q: %v", ip, err)
131+
claimedNodeIP = ""
132+
}
133+
}
134+
135+
if ns, exists := eip.namespacesByEgressIP[ip]; exists {
136+
if ns.assignedIP == "" {
137+
ns.assignedIP = ip
138+
ns.nodeIP = claimedNodeIP
139+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, claimedNodeIP, hex)
140+
if err != nil {
141+
glog.Errorf("Error updating Namespace egress rules: %v", err)
142+
}
143+
}
144+
}
145+
}
146+
147+
// Process removed EgressIPs
148+
for _, ip := range oldEgressIPs.Difference(node.egressIPs).UnsortedList() {
149+
delete(eip.nodesByEgressIP, ip)
150+
hex := ipToHex(ip)
151+
152+
if nodeIP == eip.localIP {
153+
if err := eip.releaseEgressIP(ip, hex); err != nil {
154+
glog.Errorf("Error releasing Egress IP %q: %v", ip, err)
155+
}
156+
}
157+
158+
if ns, exists := eip.namespacesByEgressIP[ip]; exists {
159+
if ns.assignedIP == ip {
160+
ns.assignedIP = ""
161+
ns.nodeIP = ""
162+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, "", hex)
163+
if err != nil {
164+
glog.Errorf("Error updating Namespace egress rules: %v", err)
165+
}
166+
}
167+
}
168+
}
169+
}
170+
171+
func (eip *egressIPWatcher) watchNetNamespaces() {
172+
common.RunEventQueue(eip.networkClient.Network().RESTClient(), common.NetNamespaces, func(delta cache.Delta) error {
173+
netns := delta.Object.(*networkapi.NetNamespace)
174+
175+
var egressIP string
176+
if delta.Type != cache.Deleted && len(netns.EgressIPs) != 0 {
177+
egressIP = netns.EgressIPs[0]
178+
}
179+
180+
eip.updateNamespace(netns.NetID, egressIP)
181+
return nil
182+
})
183+
}
184+
185+
func (eip *egressIPWatcher) updateNamespace(vnid uint32, egressIP string) {
186+
eip.Lock()
187+
defer eip.Unlock()
188+
189+
ns := eip.namespacesByVNID[vnid]
190+
if ns == nil {
191+
if egressIP == "" {
192+
return
193+
}
194+
ns = &namespaceEgress{vnid: vnid}
195+
eip.namespacesByVNID[vnid] = ns
196+
}
197+
if ns.claimedIP == egressIP {
198+
return
199+
}
200+
201+
if ns.claimedIP != "" {
202+
delete(eip.namespacesByEgressIP, ns.claimedIP)
203+
ns.assignedIP = ""
204+
ns.nodeIP = ""
205+
}
206+
ns.claimedIP = egressIP
207+
eip.namespacesByEgressIP[egressIP] = ns
208+
if node := eip.nodesByEgressIP[egressIP]; node != nil {
209+
ns.assignedIP = egressIP
210+
ns.nodeIP = node.nodeIP
211+
}
212+
213+
egressHex := ""
214+
if egressIP != "" {
215+
egressHex = ipToHex(egressIP)
216+
}
217+
218+
err := eip.oc.UpdateNamespaceEgressRules(ns.vnid, ns.nodeIP, egressHex)
219+
if err != nil {
220+
glog.Errorf("Error updating Namespace egress rules: %v", err)
221+
}
222+
}
223+
224+
func (eip *egressIPWatcher) claimEgressIP(egressIP, egressHex string) error {
225+
if eip.testModeChan != nil {
226+
eip.testModeChan <- fmt.Sprintf("claim %s", egressIP)
227+
return nil
228+
}
229+
230+
if eip.localEgressLink == nil {
231+
links, err := netlink.LinkList()
232+
if err != nil {
233+
return fmt.Errorf("could not get list of network interfaces while adding egress IP: %v", err)
234+
}
235+
linkLoop:
236+
for _, link := range links {
237+
addrs, err := netlink.AddrList(link, syscall.AF_INET)
238+
if err != nil {
239+
return fmt.Errorf("could not get addresses of interface %q while adding egress IP: %v", link.Attrs().Name, err)
240+
}
241+
242+
for _, addr := range addrs {
243+
if addr.IP.String() == eip.localIP {
244+
eip.localEgressLink = link
245+
eip.localEgressIPMaskLen, _ = addr.Mask.Size()
246+
break linkLoop
247+
}
248+
}
249+
}
250+
251+
if eip.localEgressLink == nil {
252+
return fmt.Errorf("could not find network interface with the address %q while adding egress IP", eip.localIP)
253+
}
254+
}
255+
256+
egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen)
257+
addr, err := netlink.ParseAddr(egressIPNet)
258+
if err != nil {
259+
return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err)
260+
}
261+
err = netlink.AddrAdd(eip.localEgressLink, addr)
262+
if err != nil {
263+
return fmt.Errorf("could not add egress IP %q to %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err)
264+
}
265+
266+
if err := eip.iptables.AddEgressIPRules(egressIP, egressHex); err != nil {
267+
return fmt.Errorf("could not add egress IP iptables rule: %v", err)
268+
}
269+
270+
return nil
271+
}
272+
273+
func (eip *egressIPWatcher) releaseEgressIP(egressIP, egressHex string) error {
274+
if eip.testModeChan != nil {
275+
eip.testModeChan <- fmt.Sprintf("release %s", egressIP)
276+
return nil
277+
}
278+
279+
if eip.localEgressLink == nil {
280+
return nil
281+
}
282+
283+
egressIPNet := fmt.Sprintf("%s/%d", egressIP, eip.localEgressIPMaskLen)
284+
addr, err := netlink.ParseAddr(egressIPNet)
285+
if err != nil {
286+
return fmt.Errorf("could not parse egress IP %q: %v", egressIPNet, err)
287+
}
288+
err = netlink.AddrDel(eip.localEgressLink, addr)
289+
if err != nil {
290+
if err == syscall.EADDRNOTAVAIL {
291+
glog.V(2).Infof("Could not delete egress IP %q from %s: no such address", egressIPNet, eip.localEgressLink.Attrs().Name)
292+
} else {
293+
return fmt.Errorf("could not delete egress IP %q from %s: %v", egressIPNet, eip.localEgressLink.Attrs().Name, err)
294+
}
295+
}
296+
297+
if err := eip.iptables.DeleteEgressIPRules(egressIP, egressHex); err != nil {
298+
return fmt.Errorf("could not delete egress IP iptables rule: %v", err)
299+
}
300+
301+
return nil
302+
}

0 commit comments

Comments
 (0)