Skip to content

keepalive API #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions cmd/mock-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,11 @@ func main() {
flag.Parse()

endpoint := os.Getenv("CSI_ENDPOINT")
if len(endpoint) == 0 {
fmt.Println("CSI_ENDPOINT must be defined and must be a path")
os.Exit(1)
}
if strings.Contains(endpoint, ":") {
fmt.Println("CSI_ENDPOINT must be a unix path")
os.Exit(1)
}

controllerEndpoint := os.Getenv("CSI_CONTROLLER_ENDPOINT")
if len(controllerEndpoint) == 0 {
// If empty, set to the common endpoint.
controllerEndpoint = endpoint
}
if strings.Contains(controllerEndpoint, ":") {
fmt.Println("CSI_CONTROLLER_ENDPOINT must be a unix path")
os.Exit(1)
}

// Create mock driver
s := service.New(config)
Expand All @@ -77,16 +64,14 @@ func main() {
}

// Listen
os.Remove(endpoint)
os.Remove(controllerEndpoint)
l, err := net.Listen("unix", endpoint)
l, cleanup, err := listen(endpoint)
if err != nil {
fmt.Printf("Error: Unable to listen on %s socket: %v\n",
endpoint,
err)
os.Exit(1)
}
defer os.Remove(endpoint)
defer cleanup()

// Start server
if err := d.Start(l); err != nil {
Expand Down Expand Up @@ -129,15 +114,14 @@ func main() {
}

// Listen controller.
os.Remove(controllerEndpoint)
l, err := net.Listen("unix", controllerEndpoint)
l, cleanupController, err := listen(controllerEndpoint)
if err != nil {
fmt.Printf("Error: Unable to listen on %s socket: %v\n",
controllerEndpoint,
err)
os.Exit(1)
}
defer os.Remove(controllerEndpoint)
defer cleanupController()

// Start controller server.
if err = dc.Start(l); err != nil {
Expand All @@ -148,15 +132,14 @@ func main() {
fmt.Println("mock controller driver started")

// Listen node.
os.Remove(endpoint)
l, err = net.Listen("unix", endpoint)
l, cleanupNode, err := listen(endpoint)
if err != nil {
fmt.Printf("Error: Unable to listen on %s socket: %v\n",
endpoint,
err)
os.Exit(1)
}
defer os.Remove(endpoint)
defer cleanupNode()

// Start node server.
if err = dn.Start(l); err != nil {
Expand All @@ -182,3 +165,36 @@ func main() {
fmt.Println("mock drivers stopped")
}
}

func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
// Assume everything else is a file path for a Unix Domain Socket.
return "unix", ep, nil
}

func listen(endpoint string) (net.Listener, func(), error) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, nil, err
}

cleanup := func() {}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow
return nil, nil, fmt.Errorf("%s: %q", addr, err)
}
cleanup = func() {
os.Remove(addr)
}
}

l, err := net.Listen(proto, addr)
return l, cleanup, err
}
2 changes: 1 addition & 1 deletion driver/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *MockCSIDriver) Nexus() (*grpc.ClientConn, error) {
}

// Create a client connection
m.conn, err = utils.Connect(m.Address())
m.conn, err = utils.Connect(m.Address(), grpc.WithInsecure())
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ TESTARGS=$@
UDS="/tmp/e2e-csi-sanity.sock"
UDS_NODE="/tmp/e2e-csi-sanity-node.sock"
UDS_CONTROLLER="/tmp/e2e-csi-sanity-ctrl.sock"
# Protocol specified as for net.Listen...
TCP_SERVER="tcp://localhost:7654"
# ... and slightly differently for gRPC.
TCP_CLIENT="dns:///localhost:7654"
CSI_ENDPOINTS="$CSI_ENDPOINTS ${UDS}"
CSI_MOCK_VERSION="master"

Expand Down Expand Up @@ -108,6 +112,7 @@ cd cmd/csi-sanity
make clean install || exit 1
cd ../..

runTest "${TCP_SERVER}" "${TCP_CLIENT}" &&
runTest "${UDS}" "${UDS}" &&
runTestWithCreds "${UDS}" "${UDS}" &&
runTestAPI "${UDS}" &&
Expand Down
16 changes: 14 additions & 2 deletions pkg/sanity/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ type TestConfig struct {
// is empty, it must provide both the controller and node service.
Address string

// DialOptions specifies the options that are to be used
// when connecting to Address. The default is grpc.WithInsecure().
// A dialer will be added for Unix Domain Sockets.
DialOptions []grpc.DialOption

// ControllerAddress optionally provides the gRPC endpoint of
// the controller service.
ControllerAddress string

// ControllerDialOptions specifies the options that are to be used
// for ControllerAddress.
ControllerDialOptions []grpc.DialOption

// SecretsFile is the filename of a .yaml file which is used
// to populate CSISecrets which are then used for calls to the
// CSI driver.
Expand Down Expand Up @@ -175,6 +184,9 @@ func NewTestConfig() TestConfig {
RemovePathCmdTimeout: 10 * time.Second,
TestVolumeSize: 10 * 1024 * 1024 * 1024, // 10 GiB
IDGen: &DefaultIDGenerator{},

DialOptions: []grpc.DialOption{grpc.WithInsecure()},
ControllerDialOptions: []grpc.DialOption{grpc.WithInsecure()},
}
}

Expand Down Expand Up @@ -240,7 +252,7 @@ func (sc *TestContext) Setup() {
sc.Conn.Close()
}
By("connecting to CSI driver")
sc.Conn, err = utils.Connect(sc.Config.Address)
sc.Conn, err = utils.Connect(sc.Config.Address, sc.Config.DialOptions...)
Expect(err).NotTo(HaveOccurred())
sc.connAddress = sc.Config.Address
} else {
Expand All @@ -253,7 +265,7 @@ func (sc *TestContext) Setup() {
sc.ControllerConn = sc.Conn
sc.controllerConnAddress = sc.Config.Address
} else {
sc.ControllerConn, err = utils.Connect(sc.Config.ControllerAddress)
sc.ControllerConn, err = utils.Connect(sc.Config.ControllerAddress, sc.Config.ControllerDialOptions...)
Expect(err).NotTo(HaveOccurred())
sc.controllerConnAddress = sc.Config.ControllerAddress
}
Expand Down
2 changes: 1 addition & 1 deletion test/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSimpleDriver(t *testing.T) {
defer s.Stop()

// Setup a connection to the driver
conn, err := utils.Connect(s.Address())
conn, err := utils.Connect(s.Address(), grpc.WithInsecure())
if err != nil {
t.Errorf("Error: %s", err.Error())
}
Expand Down
11 changes: 1 addition & 10 deletions utils/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
)

// Connect address by grpc
func Connect(address string) (*grpc.ClientConn, error) {
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
}
func Connect(address string, dialOptions ...grpc.DialOption) (*grpc.ClientConn, error) {
u, err := url.Parse(address)
if err == nil && (!u.IsAbs() || u.Scheme == "unix") {
dialOptions = append(dialOptions,
Expand All @@ -41,11 +37,6 @@ func Connect(address string) (*grpc.ClientConn, error) {
return net.DialTimeout("unix", u.Path, timeout)
}))
}
// This is necessary when connecting via TCP and does not hurt
// when using Unix domain sockets. It ensures that gRPC detects a dead connection
// in a timely manner.
dialOptions = append(dialOptions,
grpc.WithKeepaliveParams(keepalive.ClientParameters{PermitWithoutStream: true}))

conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ google.golang.org/grpc/codes
google.golang.org/grpc/reflection
google.golang.org/grpc/status
google.golang.org/grpc/connectivity
google.golang.org/grpc/keepalive
google.golang.org/grpc/balancer
google.golang.org/grpc/balancer/roundrobin
google.golang.org/grpc/credentials
Expand All @@ -112,6 +111,7 @@ google.golang.org/grpc/internal/envconfig
google.golang.org/grpc/internal/grpcrand
google.golang.org/grpc/internal/grpcsync
google.golang.org/grpc/internal/transport
google.golang.org/grpc/keepalive
google.golang.org/grpc/metadata
google.golang.org/grpc/naming
google.golang.org/grpc/peer
Expand Down