Skip to content

Commit adc05c8

Browse files
author
Ravi Sankar Penta
committed
Changed ovs.Transaction from pseudo to real atomic transaction
- Leverages ovs.bundle() to perform atomic transactions - Now ovs.Transaction interface has AddFlow(), DeleteFlows() and Commit() methods General usage: otx := ovs.NewTransaction() otx.AddFlow(flow1) // No execution, only caches flow context ... otx.DeleteFlows(flowN) // No execution, only caches flow context ... err := otx.Commit() // Executes all cached flows as single atomic transaction - With this change, most of the operations in ovs controller like setup, addHostSubnet, addService, etc. has only one ovs bundle call. So there won't be partial commited changes and it reduces the downtime during watch resync events.
1 parent c6b4383 commit adc05c8

File tree

8 files changed

+88
-101
lines changed

8 files changed

+88
-101
lines changed

pkg/network/node/multitenant.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (mp *multiTenantPlugin) Start(node *OsdnNode) error {
4949
otx := node.oc.NewTransaction()
5050
otx.AddFlow("table=80, priority=200, reg0=0, actions=output:NXM_NX_REG2[]")
5151
otx.AddFlow("table=80, priority=200, reg1=0, actions=output:NXM_NX_REG2[]")
52-
if err := otx.EndTransaction(); err != nil {
52+
if err := otx.Commit(); err != nil {
5353
return err
5454
}
5555

@@ -141,7 +141,7 @@ func (mp *multiTenantPlugin) EnsureVNIDRules(vnid uint32) {
141141

142142
otx := mp.node.oc.NewTransaction()
143143
otx.AddFlow("table=80, priority=100, reg0=%d, reg1=%d, actions=output:NXM_NX_REG2[]", vnid, vnid)
144-
if err := otx.EndTransaction(); err != nil {
144+
if err := otx.Commit(); err != nil {
145145
utilruntime.HandleError(fmt.Errorf("Error adding OVS flow for VNID: %v", err))
146146
}
147147
}
@@ -158,7 +158,7 @@ func (mp *multiTenantPlugin) SyncVNIDRules() {
158158
mp.vnidInUse[uint32(vnid)] = false
159159
otx.DeleteFlows("table=80, reg1=%d", vnid)
160160
}
161-
if err := otx.EndTransaction(); err != nil {
161+
if err := otx.Commit(); err != nil {
162162
utilruntime.HandleError(fmt.Errorf("Error deleting syncing OVS VNID rules: %v", err))
163163
}
164164
}

pkg/network/node/networkpolicy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (np *networkPolicyPlugin) Start(node *OsdnNode) error {
9191
otx.AddFlow("table=21, priority=200, ip, nw_dst=%s, actions=ct(commit,table=30)", cn.ClusterCIDR.String())
9292
}
9393
otx.AddFlow("table=80, priority=200, ip, ct_state=+rpl, actions=output:NXM_NX_REG2[]")
94-
if err := otx.EndTransaction(); err != nil {
94+
if err := otx.Commit(); err != nil {
9595
return err
9696
}
9797

@@ -229,7 +229,7 @@ func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
229229
otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
230230
}
231231
}
232-
if err := otx.EndTransaction(); err != nil {
232+
if err := otx.Commit(); err != nil {
233233
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for VNID: %v", err))
234234
}
235235
}

pkg/network/node/ovscontroller.go

+13-18
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,7 @@ func (oc *ovsController) SetupOVS(clusterNetworkCIDR []string, serviceNetworkCID
217217
// Table 253: rule version note
218218
otx.AddFlow("table=%d, actions=note:%s", ruleVersionTable, oc.getVersionNote())
219219

220-
err = otx.EndTransaction()
221-
if err != nil {
222-
return err
223-
}
224-
225-
return nil
220+
return otx.Commit()
226221
}
227222

228223
func (oc *ovsController) NewTransaction() ovs.Transaction {
@@ -255,7 +250,7 @@ func (oc *ovsController) setupPodFlows(ofport int, podIP net.IP, vnid uint32) er
255250
// IP traffic to container
256251
otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", ipstr, vnid, ofport)
257252

258-
return otx.EndTransaction()
253+
return otx.Commit()
259254
}
260255

261256
func (oc *ovsController) cleanupPodFlows(podIP net.IP) error {
@@ -266,7 +261,7 @@ func (oc *ovsController) cleanupPodFlows(podIP net.IP) error {
266261
otx.DeleteFlows("ip, nw_src=%s", ipstr)
267262
otx.DeleteFlows("arp, nw_dst=%s", ipstr)
268263
otx.DeleteFlows("arp, nw_src=%s", ipstr)
269-
return otx.EndTransaction()
264+
return otx.Commit()
270265
}
271266

272267
func (oc *ovsController) SetUpPod(sandboxID, hostVeth string, podIP net.IP, vnid uint32) (int, error) {
@@ -501,7 +496,7 @@ func (oc *ovsController) UpdateEgressNetworkPolicyRules(policies []networkapi.Eg
501496
otx.DeleteFlows("table=101, reg0=%d, cookie=1/1", vnid)
502497
}
503498

504-
if txErr := otx.EndTransaction(); txErr != nil {
499+
if txErr := otx.Commit(); txErr != nil {
505500
errs = append(errs, txErr)
506501
}
507502

@@ -526,7 +521,7 @@ func (oc *ovsController) AddHostSubnetRules(subnet *networkapi.HostSubnet) error
526521
otx.AddFlow("table=90, priority=100, cookie=0x%08x, ip, nw_dst=%s, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", cookie, subnet.Subnet, subnet.HostIP)
527522
}
528523

529-
return otx.EndTransaction()
524+
return otx.Commit()
530525
}
531526

532527
func (oc *ovsController) DeleteHostSubnetRules(subnet *networkapi.HostSubnet) error {
@@ -536,7 +531,7 @@ func (oc *ovsController) DeleteHostSubnetRules(subnet *networkapi.HostSubnet) er
536531
otx.DeleteFlows("table=10, cookie=0x%08x/0xffffffff, tun_src=%s", cookie, subnet.HostIP)
537532
otx.DeleteFlows("table=50, cookie=0x%08x/0xffffffff, arp, nw_dst=%s", cookie, subnet.Subnet)
538533
otx.DeleteFlows("table=90, cookie=0x%08x/0xffffffff, ip, nw_dst=%s", cookie, subnet.Subnet)
539-
return otx.EndTransaction()
534+
return otx.Commit()
540535
}
541536

542537
func (oc *ovsController) AddServiceRules(service *kapi.Service, netID uint32) error {
@@ -555,13 +550,13 @@ func (oc *ovsController) AddServiceRules(service *kapi.Service, netID uint32) er
555550
otx.AddFlow(baseRule + action)
556551
}
557552

558-
return otx.EndTransaction()
553+
return otx.Commit()
559554
}
560555

561556
func (oc *ovsController) DeleteServiceRules(service *kapi.Service) error {
562557
otx := oc.ovs.NewTransaction()
563558
otx.DeleteFlows(generateBaseServiceRule(service.Spec.ClusterIP))
564-
return otx.EndTransaction()
559+
return otx.Commit()
565560
}
566561

567562
func generateBaseServiceRule(IP string) string {
@@ -601,7 +596,7 @@ func (oc *ovsController) UpdateLocalMulticastFlows(vnid uint32, enabled bool, of
601596
otx.DeleteFlows("table=120, reg0=%d", vnid)
602597
}
603598

604-
return otx.EndTransaction()
599+
return otx.Commit()
605600
}
606601

607602
func (oc *ovsController) UpdateVXLANMulticastFlows(remoteIPs []string) error {
@@ -618,7 +613,7 @@ func (oc *ovsController) UpdateVXLANMulticastFlows(remoteIPs []string) error {
618613
otx.AddFlow("table=111, priority=100, actions=goto_table:120")
619614
}
620615

621-
return otx.EndTransaction()
616+
return otx.Commit()
622617
}
623618

624619
// FindUnusedVNIDs returns a list of VNIDs for which there are table 80 "check" rules,
@@ -702,14 +697,14 @@ func (oc *ovsController) ensureTunMAC() error {
702697
func (oc *ovsController) SetNamespaceEgressNormal(vnid uint32) error {
703698
otx := oc.ovs.NewTransaction()
704699
otx.DeleteFlows("table=100, reg0=%d", vnid)
705-
return otx.EndTransaction()
700+
return otx.Commit()
706701
}
707702

708703
func (oc *ovsController) SetNamespaceEgressDropped(vnid uint32) error {
709704
otx := oc.ovs.NewTransaction()
710705
otx.DeleteFlows("table=100, reg0=%d", vnid)
711706
otx.AddFlow("table=100, priority=100, reg0=%d, actions=drop", vnid)
712-
return otx.EndTransaction()
707+
return otx.Commit()
713708
}
714709

715710
func (oc *ovsController) SetNamespaceEgressViaEgressIP(vnid uint32, nodeIP, mark string) error {
@@ -726,5 +721,5 @@ func (oc *ovsController) SetNamespaceEgressViaEgressIP(vnid uint32, nodeIP, mark
726721
} else {
727722
otx.AddFlow("table=100, priority=100, reg0=%d, ip, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:%s->tun_dst,output:1", vnid, nodeIP)
728723
}
729-
return otx.EndTransaction()
724+
return otx.Commit()
730725
}

pkg/network/node/ovscontroller_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ func TestAlreadySetUp(t *testing.T) {
805805

806806
otx := ovsif.NewTransaction()
807807
otx.AddFlow(tc.flow)
808-
if err := otx.EndTransaction(); err != nil {
808+
if err := otx.Commit(); err != nil {
809809
t.Fatalf("(%d) unexpected error from AddFlow: %v", i, err)
810810
}
811811
if success := oc.AlreadySetUp(); success != tc.success {
@@ -919,7 +919,7 @@ func TestSyncVNIDRules(t *testing.T) {
919919
for _, flow := range tc.flows {
920920
otx.AddFlow(flow)
921921
}
922-
if err := otx.EndTransaction(); err != nil {
922+
if err := otx.Commit(); err != nil {
923923
t.Fatalf("(%d) unexpected error from AddFlow: %v", i, err)
924924
}
925925

pkg/network/node/singletenant.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (sp *singleTenantPlugin) SupportsVNIDs() bool {
2424
func (sp *singleTenantPlugin) Start(node *OsdnNode) error {
2525
otx := node.oc.NewTransaction()
2626
otx.AddFlow("table=80, priority=200, actions=output:NXM_NX_REG2[]")
27-
return otx.EndTransaction()
27+
return otx.Commit()
2828
}
2929

3030
func (sp *singleTenantPlugin) AddNetNamespace(netns *networkapi.NetNamespace) {

pkg/util/ovs/fake_ovs_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func TestFakeDumpFlows(t *testing.T) {
165165
otx.AddFlow("table=30, priority=300, ip, nw_dst=%s, actions=output:2", localSubnetGateway)
166166
otx.AddFlow("table=35, priority=300, ip, nw_dst=%s, actions=ct(commit,exec(set_field:1->ct_mark),table=70)", localSubnetGateway)
167167

168-
err = otx.EndTransaction()
168+
err = otx.Commit()
169169
if err != nil {
170170
t.Fatalf("unexpected error from AddFlow: %v", err)
171171
}
@@ -280,7 +280,7 @@ func TestFlowMatchesMasked(t *testing.T) {
280280
otx.AddFlow("table=100, priority=200, reg0=2, actions=two")
281281
otx.AddFlow("table=100, priority=300, reg0=3, cookie=1, actions=three")
282282
otx.AddFlow("table=100, priority=400, reg0=4, cookie=0xe, actions=four")
283-
err = otx.EndTransaction()
283+
err = otx.Commit()
284284
if err != nil {
285285
t.Fatalf("unexpected error from AddFlow: %v", err)
286286
}
@@ -294,7 +294,7 @@ func TestFlowMatchesMasked(t *testing.T) {
294294

295295
otx = ovsif.NewTransaction()
296296
otx.DeleteFlows("table=100, cookie=0/0xFFFF")
297-
err = otx.EndTransaction()
297+
err = otx.Commit()
298298
if err != nil {
299299
t.Fatalf("unexpected error from AddFlow: %v", err)
300300
}
@@ -308,7 +308,7 @@ func TestFlowMatchesMasked(t *testing.T) {
308308

309309
otx = ovsif.NewTransaction()
310310
otx.DeleteFlows("table=100, cookie=2/2")
311-
err = otx.EndTransaction()
311+
err = otx.Commit()
312312
if err != nil {
313313
t.Fatalf("unexpected error from AddFlow: %v", err)
314314
}

pkg/util/ovs/ovs.go

+42-48
Original file line numberDiff line numberDiff line change
@@ -73,26 +73,25 @@ type Interface interface {
7373
// strings, one per flow. If flow is not "" then it describes the flows to dump.
7474
DumpFlows(flow string, args ...interface{}) ([]string, error)
7575

76-
// NewTransaction begins a new OVS transaction. If an error occurs at
77-
// any step in the transaction, it will be recorded until
78-
// EndTransaction(), and any further calls on the transaction will be
79-
// ignored.
76+
// NewTransaction begins a new OVS transaction.
8077
NewTransaction() Transaction
8178
}
8279

8380
// Transaction manages a single set of OVS flow modifications
8481
type Transaction interface {
85-
// AddFlow adds a flow to the bridge. The arguments are passed to fmt.Sprintf().
82+
// AddFlow prepares adding a flow to the bridge.
83+
// Given flow is cached but not executed at this time.
84+
// The arguments are passed to fmt.Sprintf().
8685
AddFlow(flow string, args ...interface{})
8786

88-
// DeleteFlows deletes all matching flows from the bridge. The arguments are
89-
// passed to fmt.Sprintf().
87+
// DeleteFlows prepares deleting all matching flows from the bridge.
88+
// Given flow is cached but not executed at this time.
89+
// The arguments are passed to fmt.Sprintf().
9090
DeleteFlows(flow string, args ...interface{})
9191

92-
// EndTransaction ends an OVS transaction and returns any error that occurred
93-
// during the transaction. You should not use the transaction again after
94-
// calling this function.
95-
EndTransaction() error
92+
// Commit executes all cached flows as a single atomic transaction and
93+
// returns any error that occurred during the transaction.
94+
Commit() error
9695
}
9796

9897
const (
@@ -297,43 +296,6 @@ func (ovsif *ovsExec) Clear(table, record string, columns ...string) error {
297296
return err
298297
}
299298

300-
type ovsExecTx struct {
301-
ovsif *ovsExec
302-
err error
303-
}
304-
305-
func (tx *ovsExecTx) exec(cmd string, args ...string) (string, error) {
306-
out := ""
307-
if tx.err == nil {
308-
out, tx.err = tx.ovsif.exec(cmd, args...)
309-
}
310-
return out, tx.err
311-
}
312-
313-
func (ovsif *ovsExec) NewTransaction() Transaction {
314-
return &ovsExecTx{ovsif: ovsif}
315-
}
316-
317-
func (tx *ovsExecTx) AddFlow(flow string, args ...interface{}) {
318-
if len(args) > 0 {
319-
flow = fmt.Sprintf(flow, args...)
320-
}
321-
tx.exec(OVS_OFCTL, "add-flow", tx.ovsif.bridge, flow)
322-
}
323-
324-
func (tx *ovsExecTx) DeleteFlows(flow string, args ...interface{}) {
325-
if len(args) > 0 {
326-
flow = fmt.Sprintf(flow, args...)
327-
}
328-
tx.exec(OVS_OFCTL, "del-flows", tx.ovsif.bridge, flow)
329-
}
330-
331-
func (tx *ovsExecTx) EndTransaction() error {
332-
err := tx.err
333-
tx.err = nil
334-
return err
335-
}
336-
337299
func (ovsif *ovsExec) DumpFlows(flow string, args ...interface{}) ([]string, error) {
338300
if len(args) > 0 {
339301
flow = fmt.Sprintf(flow, args...)
@@ -353,6 +315,10 @@ func (ovsif *ovsExec) DumpFlows(flow string, args ...interface{}) ([]string, err
353315
return flows, nil
354316
}
355317

318+
func (ovsif *ovsExec) NewTransaction() Transaction {
319+
return &ovsExecTx{ovsif: ovsif, flows: []string{}}
320+
}
321+
356322
// bundle executes all given flows as a single atomic transaction
357323
func (ovsif *ovsExec) bundle(flows []string) error {
358324
if len(flows) == 0 {
@@ -362,3 +328,31 @@ func (ovsif *ovsExec) bundle(flows []string) error {
362328
_, err := ovsif.execWithStdin(OVS_OFCTL, flows, "bundle", ovsif.bridge, "-")
363329
return err
364330
}
331+
332+
// ovsExecTx implements ovs.Transaction and maintains current flow context
333+
type ovsExecTx struct {
334+
ovsif *ovsExec
335+
flows []string
336+
}
337+
338+
func (tx *ovsExecTx) AddFlow(flow string, args ...interface{}) {
339+
if len(args) > 0 {
340+
flow = fmt.Sprintf(flow, args...)
341+
}
342+
tx.flows = append(tx.flows, fmt.Sprintf("flow add %s", flow))
343+
}
344+
345+
func (tx *ovsExecTx) DeleteFlows(flow string, args ...interface{}) {
346+
if len(args) > 0 {
347+
flow = fmt.Sprintf(flow, args...)
348+
}
349+
tx.flows = append(tx.flows, fmt.Sprintf("flow delete %s", flow))
350+
}
351+
352+
func (tx *ovsExecTx) Commit() error {
353+
err := tx.ovsif.bundle(tx.flows)
354+
355+
// Reset flow context
356+
tx.flows = []string{}
357+
return err
358+
}

0 commit comments

Comments
 (0)