Skip to content

Commit 89dd3d3

Browse files
committed
Run integration tests against full agent & proxy-server apps
1 parent 6c63560 commit 89dd3d3

20 files changed

+605
-331
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ konnectivity.out
1818
konnectivity.html
1919
konnectivity-client/client.out
2020
konnectivity-client/client.html
21+
22+
tests.test

cmd/agent/app/server.go

+21-9
Original file line numberDiff line numberDiff line change
@@ -49,43 +49,50 @@ func NewAgentCommand(a *Agent, o *options.GrpcProxyAgentOptions) *cobra.Command
4949
Use: "agent",
5050
Long: `A gRPC agent, Connects to the proxy and then allows traffic to be forwarded to it.`,
5151
RunE: func(cmd *cobra.Command, args []string) error {
52-
return a.run(o)
52+
stopCh := make(chan struct{})
53+
return a.Run(o, stopCh)
5354
},
5455
}
5556

5657
return cmd
5758
}
5859

5960
type Agent struct {
61+
adminServer *http.Server
62+
healthServer *http.Server
63+
64+
cs *agent.ClientSet
6065
}
6166

62-
func (a *Agent) run(o *options.GrpcProxyAgentOptions) error {
67+
func (a *Agent) Run(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error {
6368
o.Print()
6469
if err := o.Validate(); err != nil {
6570
return fmt.Errorf("failed to validate agent options with %v", err)
6671
}
6772

68-
stopCh := make(chan struct{})
69-
7073
cs, err := a.runProxyConnection(o, stopCh)
7174
if err != nil {
7275
return fmt.Errorf("failed to run proxy connection with %v", err)
7376
}
77+
a.cs = cs
7478

7579
if err := a.runHealthServer(o, cs); err != nil {
7680
return fmt.Errorf("failed to run health server with %v", err)
7781
}
82+
defer a.healthServer.Close()
7883

7984
if err := a.runAdminServer(o); err != nil {
8085
return fmt.Errorf("failed to run admin server with %v", err)
8186
}
87+
defer a.adminServer.Close()
8288

8389
<-stopCh
90+
klog.V(1).Infoln("Shutting down agent.")
8491

8592
return nil
8693
}
8794

88-
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (agent.ReadinessManager, error) {
95+
func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) (*agent.ClientSet, error) {
8996
var tlsConfig *tls.Config
9097
var err error
9198
if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil {
@@ -149,7 +156,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi
149156
// "/ready" is deprecated but being maintained for backward compatibility
150157
muxHandler.HandleFunc("/ready", readinessHandler)
151158
muxHandler.HandleFunc("/readyz", readinessHandler)
152-
healthServer := &http.Server{
159+
a.healthServer = &http.Server{
153160
Addr: net.JoinHostPort(o.HealthServerHost, strconv.Itoa(o.HealthServerPort)),
154161
Handler: muxHandler,
155162
MaxHeaderBytes: 1 << 20,
@@ -160,7 +167,7 @@ func (a *Agent) runHealthServer(o *options.GrpcProxyAgentOptions, cs agent.Readi
160167
"core", "healthListener",
161168
"port", strconv.Itoa(o.HealthServerPort),
162169
)
163-
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(healthServer) })
170+
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveHealth(a.healthServer) })
164171

165172
return nil
166173
}
@@ -197,7 +204,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
197204
}
198205
}
199206

200-
adminServer := &http.Server{
207+
a.adminServer = &http.Server{
201208
Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminServerPort)),
202209
Handler: muxHandler,
203210
MaxHeaderBytes: 1 << 20,
@@ -208,7 +215,7 @@ func (a *Agent) runAdminServer(o *options.GrpcProxyAgentOptions) error {
208215
"core", "adminListener",
209216
"port", strconv.Itoa(o.AdminServerPort),
210217
)
211-
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(adminServer) })
218+
go runpprof.Do(context.Background(), labels, func(context.Context) { a.serveAdmin(a.adminServer) })
212219

213220
return nil
214221
}
@@ -220,3 +227,8 @@ func (a *Agent) serveAdmin(adminServer *http.Server) {
220227
}
221228
klog.V(0).Infoln("Admin server stopped listening")
222229
}
230+
231+
// ClientSet exposes internal state for testing.
232+
func (a *Agent) ClientSet() *agent.ClientSet {
233+
return a.cs
234+
}

cmd/server/app/options/options.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (o *ProxyRunOptions) Validate() error {
221221
return fmt.Errorf("error checking cluster CA cert %s, got %v", o.ClusterCaCert, err)
222222
}
223223
}
224-
if o.Mode != "grpc" && o.Mode != "http-connect" {
224+
if o.Mode != server.ModeGRPC && o.Mode != server.ModeHTTPConnect {
225225
return fmt.Errorf("mode must be set to either 'grpc' or 'http-connect' not %q", o.Mode)
226226
}
227227
if o.UdsName != "" {

cmd/server/app/server.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ func NewProxyCommand(p *Proxy, o *options.ProxyRunOptions) *cobra.Command {
5959
Use: "proxy",
6060
Long: `A gRPC proxy server, receives requests from the API server and forwards to the agent.`,
6161
RunE: func(cmd *cobra.Command, args []string) error {
62-
return p.run(o)
62+
stopCh := SetupSignalHandler()
63+
return p.Run(o, stopCh)
6364
},
6465
}
6566

@@ -81,11 +82,16 @@ func tlsCipherSuites(cipherNames []string) []uint16 {
8182
}
8283

8384
type Proxy struct {
85+
agentServer *grpc.Server
86+
adminServer *http.Server
87+
healthServer *http.Server
88+
89+
server *server.ProxyServer
8490
}
8591

8692
type StopFunc func()
8793

88-
func (p *Proxy) run(o *options.ProxyRunOptions) error {
94+
func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
8995
o.Print()
9096
if err := o.Validate(); err != nil {
9197
return fmt.Errorf("failed to validate server options with %v", err)
@@ -126,37 +132,40 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error {
126132
if err != nil {
127133
return err
128134
}
129-
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
135+
p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
130136

131-
frontendStop, err := p.runFrontendServer(ctx, o, server)
137+
frontendStop, err := p.runFrontendServer(ctx, o, p.server)
132138
if err != nil {
133139
return fmt.Errorf("failed to run the frontend server: %v", err)
134140
}
141+
if frontendStop != nil {
142+
defer frontendStop()
143+
}
135144

136145
klog.V(1).Infoln("Starting agent server for tunnel connections.")
137-
err = p.runAgentServer(o, server)
146+
err = p.runAgentServer(o, p.server)
138147
if err != nil {
139148
return fmt.Errorf("failed to run the agent server: %v", err)
140149
}
150+
defer p.agentServer.Stop()
151+
141152
klog.V(1).Infoln("Starting admin server for debug connections.")
142-
err = p.runAdminServer(o, server)
153+
err = p.runAdminServer(o, p.server)
143154
if err != nil {
144155
return fmt.Errorf("failed to run the admin server: %v", err)
145156
}
157+
defer p.adminServer.Close()
158+
146159
klog.V(1).Infoln("Starting health server for healthchecks.")
147-
err = p.runHealthServer(o, server)
160+
err = p.runHealthServer(o, p.server)
148161
if err != nil {
149162
return fmt.Errorf("failed to run the health server: %v", err)
150163
}
164+
defer p.healthServer.Close()
151165

152-
stopCh := SetupSignalHandler()
153166
<-stopCh
154167
klog.V(1).Infoln("Shutting down server.")
155168

156-
if frontendStop != nil {
157-
frontendStop()
158-
}
159-
160169
return nil
161170
}
162171

@@ -379,6 +388,7 @@ func (p *Proxy) runAgentServer(o *options.ProxyRunOptions, server *server.ProxyS
379388
"port", strconv.FormatUint(uint64(o.AgentPort), 10),
380389
)
381390
go runpprof.Do(context.Background(), labels, func(context.Context) { grpcServer.Serve(lis) })
391+
p.agentServer = grpcServer
382392

383393
return nil
384394
}
@@ -396,7 +406,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
396406
runtime.SetBlockProfileRate(1)
397407
}
398408
}
399-
adminServer := &http.Server{
409+
p.adminServer = &http.Server{
400410
Addr: net.JoinHostPort(o.AdminBindAddress, strconv.Itoa(o.AdminPort)),
401411
Handler: muxHandler,
402412
MaxHeaderBytes: 1 << 20,
@@ -408,7 +418,7 @@ func (p *Proxy) runAdminServer(o *options.ProxyRunOptions, server *server.ProxyS
408418
"port", strconv.FormatUint(uint64(o.AdminPort), 10),
409419
)
410420
go runpprof.Do(context.Background(), labels, func(context.Context) {
411-
err := adminServer.ListenAndServe()
421+
err := p.adminServer.ListenAndServe()
412422
if err != nil {
413423
klog.ErrorS(err, "admin server could not listen")
414424
}
@@ -438,7 +448,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
438448
// "/ready" is deprecated but being maintained for backward compatibility
439449
muxHandler.HandleFunc("/ready", readinessHandler)
440450
muxHandler.HandleFunc("/readyz", readinessHandler)
441-
healthServer := &http.Server{
451+
p.healthServer = &http.Server{
442452
Addr: net.JoinHostPort(o.HealthBindAddress, strconv.Itoa(o.HealthPort)),
443453
Handler: muxHandler,
444454
MaxHeaderBytes: 1 << 20,
@@ -450,7 +460,7 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
450460
"port", strconv.FormatUint(uint64(o.HealthPort), 10),
451461
)
452462
go runpprof.Do(context.Background(), labels, func(context.Context) {
453-
err := healthServer.ListenAndServe()
463+
err := p.healthServer.ListenAndServe()
454464
if err != nil {
455465
klog.ErrorS(err, "health server could not listen")
456466
}
@@ -459,3 +469,8 @@ func (p *Proxy) runHealthServer(o *options.ProxyRunOptions, server *server.Proxy
459469

460470
return nil
461471
}
472+
473+
// ProxyServer exposes internal state for testing.
474+
func (p *Proxy) ProxyServer() *server.ProxyServer {
475+
return p.server
476+
}

cmd/server/main.go

-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
)
3030

3131
func main() {
32-
// flag.CommandLine.Parse(os.Args[1:])
3332
proxy := &app.Proxy{}
3433
o := options.NewProxyRunOptions()
3534
command := app.NewProxyCommand(proxy, o)

pkg/server/server.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ func (g *GrpcFrontend) Recv() (*client.Packet, error) {
8787
return pkt, nil
8888
}
8989

90+
const (
91+
ModeGRPC = "grpc"
92+
ModeHTTPConnect = "http-connect"
93+
)
94+
9095
type ProxyClientConnection struct {
9196
Mode string
9297
HTTP io.ReadWriter
@@ -107,10 +112,10 @@ const (
107112

108113
func (c *ProxyClientConnection) send(pkt *client.Packet) error {
109114
defer func(start time.Time) { metrics.Metrics.ObserveFrontendWriteLatency(time.Since(start)) }(time.Now())
110-
if c.Mode == "grpc" {
115+
if c.Mode == ModeGRPC {
111116
return c.frontend.Send(pkt)
112117
}
113-
if c.Mode == "http-connect" {
118+
if c.Mode == ModeHTTPConnect {
114119
if pkt.Type == client.PacketType_CLOSE_RSP {
115120
return c.CloseHTTP()
116121
} else if pkt.Type == client.PacketType_DIAL_CLS {

pkg/server/tunnel.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8484
closed := make(chan struct{})
8585
connected := make(chan struct{})
8686
connection := &ProxyClientConnection{
87-
Mode: "http-connect",
87+
Mode: ModeHTTPConnect,
8888
HTTP: io.ReadWriter(conn), // pass as ReadWriter so the caller must close with CloseHTTP
8989
CloseHTTP: func() error {
9090
closeOnce.Do(func() { conn.Close() })

tests/agent_disconnect_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import (
2929
"testing"
3030
"time"
3131

32-
"google.golang.org/grpc"
33-
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
3432
"sigs.k8s.io/apiserver-network-proxy/tests/framework"
3533
)
3634

@@ -92,7 +90,7 @@ func TestProxy_Agent_Disconnect_Persistent_Connection(t *testing.T) {
9290
}
9391
}
9492

95-
func TestProxy_Agent_Reconnect(t *testing.T) {
93+
func TestAgentRestartReconnect(t *testing.T) {
9694
testcases := []struct {
9795
name string
9896
proxyServerFunction func(testing.TB) framework.ProxyServer
@@ -176,7 +174,7 @@ func clientRequest(c *http.Client, addr string) ([]byte, error) {
176174
}
177175

178176
func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) {
179-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, proxyAddr, grpc.WithInsecure())
177+
tunnel, err := createSingleUseGrpcTunnel(ctx, proxyAddr)
180178
if err != nil {
181179
return nil, err
182180
}
@@ -192,7 +190,7 @@ func createGrpcTunnelClient(ctx context.Context, proxyAddr, addr string) (*http.
192190
}
193191

194192
func createHTTPConnectClient(ctx context.Context, proxyAddr, addr string) (*http.Client, error) {
195-
conn, err := net.Dial("tcp", proxyAddr)
193+
conn, err := net.Dial("unix", proxyAddr)
196194
if err != nil {
197195
return nil, err
198196
}

tests/benchmarks_test.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ import (
2323
"net/http"
2424
"net/http/httptest"
2525
"testing"
26-
27-
"google.golang.org/grpc"
28-
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
2926
)
3027

3128
func BenchmarkLargeResponse_GRPC(b *testing.B) {
@@ -53,7 +50,7 @@ func BenchmarkLargeResponse_GRPC(b *testing.B) {
5350

5451
for n := 0; n < b.N; n++ {
5552
// run test client
56-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
53+
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
5754
if err != nil {
5855
b.Fatal(err)
5956
}
@@ -122,7 +119,7 @@ func BenchmarkLargeRequest_GRPC(b *testing.B) {
122119
req.Close = true
123120
for n := 0; n < b.N; n++ {
124121
// run test client
125-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
122+
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
126123
if err != nil {
127124
b.Fatal(err)
128125
}

tests/concurrent_client_request_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ import (
2727
"testing"
2828
"time"
2929

30-
"google.golang.org/grpc"
3130
"k8s.io/apimachinery/pkg/util/wait"
32-
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
3331
)
3432

3533
type simpleServer struct {
@@ -52,7 +50,7 @@ func (s *simpleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
5250
// TODO: test http-connect as well.
5351
func getTestClient(front string, t *testing.T) *http.Client {
5452
ctx := context.Background()
55-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, front, grpc.WithInsecure())
53+
tunnel, err := createSingleUseGrpcTunnel(ctx, front)
5654
if err != nil {
5755
t.Fatal(err)
5856
}

tests/concurrent_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import (
2424
"sync"
2525
"testing"
2626

27-
"google.golang.org/grpc"
28-
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
2927
"sigs.k8s.io/apiserver-network-proxy/tests/framework"
3028
)
3129

@@ -48,7 +46,7 @@ func TestProxy_ConcurrencyGRPC(t *testing.T) {
4846
defer wg.Done()
4947

5048
// run test client
51-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
49+
tunnel, err := createSingleUseGrpcTunnel(ctx, ps.FrontAddr())
5250
if err != nil {
5351
t.Error(err)
5452
return

0 commit comments

Comments
 (0)