forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvnids_node.go
281 lines (241 loc) · 7.42 KB
/
vnids_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package plugin
import (
"fmt"
"sync"
"time"
log "github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
utilwait "k8s.io/kubernetes/pkg/util/wait"
osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
)
type nodeVNIDMap struct {
// Synchronizes add or remove ids/namespaces
lock sync.Mutex
ids map[string]uint32
namespaces map[uint32]sets.String
}
func newNodeVNIDMap() *nodeVNIDMap {
return &nodeVNIDMap{
ids: make(map[string]uint32),
namespaces: make(map[uint32]sets.String),
}
}
func (vmap *nodeVNIDMap) addNamespaceToSet(name string, vnid uint32) {
set, found := vmap.namespaces[vnid]
if !found {
set = sets.NewString()
vmap.namespaces[vnid] = set
}
set.Insert(name)
}
func (vmap *nodeVNIDMap) removeNamespaceFromSet(name string, vnid uint32) {
if set, found := vmap.namespaces[vnid]; found {
set.Delete(name)
if set.Len() == 0 {
delete(vmap.namespaces, vnid)
}
}
}
func (vmap *nodeVNIDMap) GetNamespaces(id uint32) []string {
vmap.lock.Lock()
defer vmap.lock.Unlock()
if set, ok := vmap.namespaces[id]; ok {
return set.List()
} else {
return nil
}
}
func (vmap *nodeVNIDMap) GetVNID(name string) (uint32, error) {
vmap.lock.Lock()
defer vmap.lock.Unlock()
if id, ok := vmap.ids[name]; ok {
return id, nil
}
return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name)
}
// Nodes asynchronously watch for both NetNamespaces and services
// NetNamespaces populates vnid map and services/pod-setup depend on vnid map
// If for some reason, vnid map propagation from master to node is slow
// and if service/pod-setup tries to lookup vnid map then it may fail.
// So, use this method to alleviate this problem. This method will
// retry vnid lookup before giving up.
func (vmap *nodeVNIDMap) WaitAndGetVNID(name string) (uint32, error) {
var id uint32
backoff := utilwait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.5,
Steps: 5,
}
err := utilwait.ExponentialBackoff(backoff, func() (bool, error) {
var err error
id, err = vmap.GetVNID(name)
return err == nil, nil
})
if err == nil {
return id, nil
} else {
return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name)
}
}
func (vmap *nodeVNIDMap) setVNID(name string, id uint32) {
vmap.lock.Lock()
defer vmap.lock.Unlock()
if oldId, found := vmap.ids[name]; found {
vmap.removeNamespaceFromSet(name, oldId)
}
vmap.ids[name] = id
vmap.addNamespaceToSet(name, id)
log.Infof("Associate netid %d to namespace %q", id, name)
}
func (vmap *nodeVNIDMap) unsetVNID(name string) (id uint32, err error) {
vmap.lock.Lock()
defer vmap.lock.Unlock()
id, found := vmap.ids[name]
if !found {
return 0, fmt.Errorf("Failed to find netid for namespace: %s in vnid map", name)
}
vmap.removeNamespaceFromSet(name, id)
delete(vmap.ids, name)
log.Infof("Dissociate netid %d from namespace %q", id, name)
return id, nil
}
func (vmap *nodeVNIDMap) populateVNIDs(osClient *osclient.Client) error {
nets, err := osClient.NetNamespaces().List(kapi.ListOptions{})
if err != nil {
return err
}
for _, net := range nets.Items {
vmap.setVNID(net.Name, net.NetID)
}
return nil
}
//------------------ Node Methods --------------------
func (node *OsdnNode) VnidStartNode() error {
// Populate vnid map synchronously so that existing services can fetch vnid
err := node.vnids.populateVNIDs(node.osClient)
if err != nil {
return err
}
go utilwait.Forever(node.watchNetNamespaces, 0)
go utilwait.Forever(node.watchServices, 0)
return nil
}
func (node *OsdnNode) updatePodNetwork(namespace string, oldNetID, netID uint32) error {
// FIXME: this is racy; traffic coming from the pods gets switched to the new
// VNID before the service and firewall rules are updated to match. We need
// to do the updates as a single transaction (ovs-ofctl --bundle).
pods, err := node.GetLocalPods(namespace)
if err != nil {
return err
}
services, err := node.kClient.Services(namespace).List(kapi.ListOptions{})
if err != nil {
return err
}
errList := []error{}
// Update OF rules for the existing/old pods in the namespace
for _, pod := range pods {
err = node.UpdatePod(pod)
if err != nil {
errList = append(errList, err)
}
}
// Update OF rules for the old services in the namespace
for _, svc := range services.Items {
if !kapi.IsServiceIPSet(&svc) {
continue
}
if err = node.DeleteServiceRules(&svc); err != nil {
log.Error(err)
}
if err = node.AddServiceRules(&svc, netID); err != nil {
errList = append(errList, err)
}
}
// Update namespace references in egress firewall rules
if err = node.UpdateEgressNetworkPolicyVNID(namespace, oldNetID, netID); err != nil {
errList = append(errList, err)
}
return kerrors.NewAggregate(errList)
}
func (node *OsdnNode) watchNetNamespaces() {
RunEventQueue(node.osClient, NetNamespaces, func(delta cache.Delta) error {
netns := delta.Object.(*osapi.NetNamespace)
log.V(5).Infof("Watch %s event for NetNamespace %q", delta.Type, netns.ObjectMeta.Name)
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
// Skip this event if the old and new network ids are same
oldNetID, err := node.vnids.GetVNID(netns.NetName)
if (err == nil) && (oldNetID == netns.NetID) {
break
}
node.vnids.setVNID(netns.NetName, netns.NetID)
err = node.updatePodNetwork(netns.NetName, oldNetID, netns.NetID)
if err != nil {
node.vnids.setVNID(netns.NetName, oldNetID)
return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
}
case cache.Deleted:
// updatePodNetwork needs vnid, so unset vnid after this call
err := node.updatePodNetwork(netns.NetName, netns.NetID, osapi.GlobalVNID)
if err != nil {
return fmt.Errorf("failed to update pod network for namespace '%s', error: %s", netns.NetName, err)
}
node.vnids.unsetVNID(netns.NetName)
}
return nil
})
}
func isServiceChanged(oldsvc, newsvc *kapi.Service) bool {
if len(oldsvc.Spec.Ports) == len(newsvc.Spec.Ports) {
for i := range oldsvc.Spec.Ports {
if oldsvc.Spec.Ports[i].Protocol != newsvc.Spec.Ports[i].Protocol ||
oldsvc.Spec.Ports[i].Port != newsvc.Spec.Ports[i].Port {
return true
}
}
return false
}
return true
}
func (node *OsdnNode) watchServices() {
services := make(map[string]*kapi.Service)
RunEventQueue(node.kClient, Services, func(delta cache.Delta) error {
serv := delta.Object.(*kapi.Service)
// Ignore headless services
if !kapi.IsServiceIPSet(serv) {
return nil
}
log.V(5).Infof("Watch %s event for Service %q", delta.Type, serv.ObjectMeta.Name)
switch delta.Type {
case cache.Sync, cache.Added, cache.Updated:
oldsvc, exists := services[string(serv.UID)]
if exists {
if !isServiceChanged(oldsvc, serv) {
break
}
if err := node.DeleteServiceRules(oldsvc); err != nil {
log.Error(err)
}
}
netid, err := node.vnids.WaitAndGetVNID(serv.Namespace)
if err != nil {
return fmt.Errorf("skipped adding service rules for serviceEvent: %v, Error: %v", delta.Type, err)
}
if err = node.AddServiceRules(serv, netid); err != nil {
return err
}
services[string(serv.UID)] = serv
case cache.Deleted:
delete(services, string(serv.UID))
if err := node.DeleteServiceRules(serv); err != nil {
return err
}
}
return nil
})
}