Skip to content

Commit ab8207c

Browse files
author
Ravi Sankar Penta
committed
Added Bundle() to ovs interface that executes all the given flows as a single atomic transaction
1 parent b598b0a commit ab8207c

File tree

2 files changed

+84
-3
lines changed

2 files changed

+84
-3
lines changed

pkg/util/ovs/ovs.go

+46-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ovs
22

33
import (
4+
"bytes"
45
"fmt"
56
"strconv"
67
"strings"
@@ -72,6 +73,9 @@ type Interface interface {
7273
// strings, one per flow. If flow is not "" then it describes the flows to dump.
7374
DumpFlows(flow string, args ...interface{}) ([]string, error)
7475

76+
// Bundle executes all given flows as a single atomic transaction
77+
Bundle(flows []string) error
78+
7579
// NewTransaction begins a new OVS transaction. If an error occurs at
7680
// any step in the transaction, it will be recorded until
7781
// EndTransaction(), and any further calls on the transaction will be
@@ -138,13 +142,23 @@ func New(execer exec.Interface, bridge string, minVersion string) (Interface, er
138142
return ovsif, nil
139143
}
140144

141-
func (ovsif *ovsExec) exec(cmd string, args ...string) (string, error) {
145+
func (ovsif *ovsExec) execWithStdin(cmd string, stdinArgs []string, args ...string) (string, error) {
142146
if cmd == OVS_OFCTL {
143147
args = append([]string{"-O", "OpenFlow13"}, args...)
144148
}
145-
glog.V(4).Infof("Executing: %s %s", cmd, strings.Join(args, " "))
146149

147-
output, err := ovsif.execer.Command(cmd, args...).CombinedOutput()
150+
kcmd := ovsif.execer.Command(cmd, args...)
151+
if stdinArgs != nil {
152+
stdinString := strings.Join(stdinArgs, "\n")
153+
stdin := bytes.NewBufferString(stdinString)
154+
kcmd.SetStdin(stdin)
155+
156+
glog.V(4).Infof("Executing: %s %s |\n%s", cmd, strings.Join(args, " "), stdinString)
157+
} else {
158+
glog.V(4).Infof("Executing: %s %s", cmd, strings.Join(args, " "))
159+
}
160+
161+
output, err := kcmd.CombinedOutput()
148162
if err != nil {
149163
glog.V(2).Infof("Error executing %s: %s", cmd, string(output))
150164
return "", err
@@ -161,6 +175,10 @@ func (ovsif *ovsExec) exec(cmd string, args ...string) (string, error) {
161175
return outStr, nil
162176
}
163177

178+
func (ovsif *ovsExec) exec(cmd string, args ...string) (string, error) {
179+
return ovsif.execWithStdin(cmd, nil, args...)
180+
}
181+
164182
func (ovsif *ovsExec) AddBridge(properties ...string) error {
165183
args := []string{"add-br", ovsif.bridge}
166184
if len(properties) > 0 {
@@ -334,3 +352,28 @@ func (ovsif *ovsExec) DumpFlows(flow string, args ...interface{}) ([]string, err
334352
}
335353
return flows, nil
336354
}
355+
356+
func (ovsif *ovsExec) Bundle(flows []string) error {
357+
if len(flows) == 0 {
358+
return nil
359+
}
360+
361+
_, err := ovsif.execWithStdin(OVS_OFCTL, flows, "bundle", ovsif.bridge, "-")
362+
return err
363+
}
364+
365+
// BundleAddFlowRepr converts regular flow add rule to bundle compatible flow rule
366+
func BundleAddFlowRepr(flow string, args ...interface{}) string {
367+
if len(args) > 0 {
368+
flow = fmt.Sprintf(flow, args...)
369+
}
370+
return fmt.Sprintf("flow add %s", flow)
371+
}
372+
373+
// BundleDeleteFlowRepr converts regular flow delete rule to bundle compatible flow rule
374+
func BundleDeleteFlowRepr(flow string, args ...interface{}) string {
375+
if len(args) > 0 {
376+
flow = fmt.Sprintf(flow, args...)
377+
}
378+
return fmt.Sprintf("flow delete %s", flow)
379+
}

pkg/util/ovs/ovs_test.go

+38
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,44 @@ func TestTransactionFailure(t *testing.T) {
9292
ensureTestResults(t, fexec)
9393
}
9494

95+
func TestBundle(t *testing.T) {
96+
fexec := normalSetup()
97+
ovsif, err := New(fexec, "br0", "")
98+
if err != nil {
99+
t.Fatalf("Unexpected error from ovs.New(): %v", err)
100+
}
101+
102+
// Empty bundle flows
103+
bundleFlows := []string{}
104+
if err = ovsif.Bundle(bundleFlows); err != nil {
105+
t.Fatalf("Unexpected error: %v", err)
106+
}
107+
ensureTestResults(t, fexec)
108+
109+
// Invalid bundle flows
110+
err = fmt.Errorf("ovs-ofctl: -:1: Unsupported bundle message type: flow2")
111+
addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", err)
112+
bundleFlows = []string{
113+
"flow add flow1",
114+
"flow2",
115+
}
116+
if err = ovsif.Bundle(bundleFlows); err == nil {
117+
t.Fatalf("Unexpectedly failed to get error")
118+
}
119+
ensureTestResults(t, fexec)
120+
121+
// Valid bundle flows
122+
addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 bundle br0 -", "", nil)
123+
bundleFlows = []string{
124+
"flow add flow1",
125+
"flow delete flow2",
126+
}
127+
if err = ovsif.Bundle(bundleFlows); err != nil {
128+
t.Fatalf("Unexpected error: %v", err)
129+
}
130+
ensureTestResults(t, fexec)
131+
}
132+
95133
func TestDumpFlows(t *testing.T) {
96134
fexec := normalSetup()
97135
addTestResult(t, fexec, "ovs-ofctl -O OpenFlow13 dump-flows br0 ", `OFPST_FLOW reply (OF1.3) (xid=0x2):

0 commit comments

Comments
 (0)