Skip to content

Commit fa6d9ab

Browse files
authored
cherry-pick 6610 and 6620 (#6627)
1 parent 467fbf2 commit fa6d9ab

File tree

4 files changed

+154
-92
lines changed

4 files changed

+154
-92
lines changed

call.go

-5
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ import (
2727
//
2828
// All errors returned by Invoke are compatible with the status package.
2929
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
30-
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
31-
return err
32-
}
33-
defer cc.idlenessMgr.OnCallEnd()
34-
3530
// allow interceptor to see all applicable call options, which means those
3631
// configured as defaults from dial option as well as per-call options
3732
opts = combine(cc.dopts.callOptions, opts)

clientconn.go

+26-12
Original file line numberDiff line numberDiff line change
@@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
10911091
ac.cancel()
10921092
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
10931093

1094-
// We have to defer here because GracefulClose => Close => onClose, which
1095-
// requires locking ac.mu.
1094+
// We have to defer here because GracefulClose => onClose, which requires
1095+
// locking ac.mu.
10961096
if ac.transport != nil {
10971097
defer ac.transport.GracefulClose()
10981098
ac.transport = nil
@@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) {
16801680
ac.updateConnectivityState(connectivity.Shutdown, nil)
16811681
ac.cancel()
16821682
ac.curAddr = resolver.Address{}
1683-
if err == errConnDrain && curTr != nil {
1684-
// GracefulClose(...) may be executed multiple times when
1685-
// i) receiving multiple GoAway frames from the server; or
1686-
// ii) there are concurrent name resolver/Balancer triggered
1687-
// address removal and GoAway.
1688-
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1689-
ac.mu.Unlock()
1690-
curTr.GracefulClose()
1691-
ac.mu.Lock()
1692-
}
1683+
16931684
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
16941685
Desc: "Subchannel deleted",
16951686
Severity: channelz.CtInfo,
@@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) {
17031694
// being deleted right away.
17041695
channelz.RemoveEntry(ac.channelzID)
17051696
ac.mu.Unlock()
1697+
1698+
// We have to release the lock before the call to GracefulClose/Close here
1699+
// because both of them call onClose(), which requires locking ac.mu.
1700+
if curTr != nil {
1701+
if err == errConnDrain {
1702+
// Close the transport gracefully when the subConn is being shutdown.
1703+
//
1704+
// GracefulClose() may be executed multiple times if:
1705+
// - multiple GoAway frames are received from the server
1706+
// - there are concurrent name resolver or balancer triggered
1707+
// address removal and GoAway
1708+
curTr.GracefulClose()
1709+
} else {
1710+
// Hard close the transport when the channel is entering idle or is
1711+
// being shutdown. In the case where the channel is being shutdown,
1712+
// closing of transports is also taken care of by cancelation of cc.ctx.
1713+
// But in the case where the channel is entering idle, we need to
1714+
// explicitly close the transports here. Instead of distinguishing
1715+
// between these two cases, it is simpler to close the transport
1716+
// unconditionally here.
1717+
curTr.Close(err)
1718+
}
1719+
}
17061720
}
17071721

17081722
func (ac *addrConn) getState() connectivity.State {

internal/idle/idle_e2e_test.go

+118-70
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"errors"
2424
"fmt"
25+
"io"
2526
"strings"
2627
"testing"
2728
"time"
@@ -141,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
141142
}
142143

143144
// Tests the case where channel idleness is enabled by passing a small value for
144-
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
145+
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
146+
// the connection to the backend is closed.
145147
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
146148
// Create a ClientConn with a short idle_timeout.
147149
r := manual.NewBuilderWithScheme("whatever")
@@ -158,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
158160
t.Cleanup(func() { cc.Close() })
159161

160162
// Start a test backend and push an address update via the resolver.
161-
backend := stubserver.StartTestService(t, nil)
163+
lis := testutils.NewListenerWrapper(t, nil)
164+
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
162165
t.Cleanup(backend.Stop)
163166
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
164167

@@ -167,92 +170,137 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
167170
defer cancel()
168171
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
169172

173+
// Retrieve the wrapped conn from the listener.
174+
v, err := lis.NewConnCh.Receive(ctx)
175+
if err != nil {
176+
t.Fatalf("Failed to retrieve conn from test listener: %v", err)
177+
}
178+
conn := v.(*testutils.ConnWrapper)
179+
170180
// Verify that the ClientConn moves to IDLE as there is no activity.
171181
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
172182

173183
// Verify idleness related channelz events.
174184
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
175185
t.Fatal(err)
176186
}
187+
188+
// Verify that the previously open connection is closed.
189+
if _, err := conn.CloseCh.Receive(ctx); err != nil {
190+
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
191+
}
177192
}
178193

179194
// Tests the case where channel idleness is enabled by passing a small value for
180195
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
181196
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
182-
// Create a ClientConn with a short idle_timeout.
183-
r := manual.NewBuilderWithScheme("whatever")
184-
dopts := []grpc.DialOption{
185-
grpc.WithTransportCredentials(insecure.NewCredentials()),
186-
grpc.WithResolvers(r),
187-
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
188-
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
189-
}
190-
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
191-
if err != nil {
192-
t.Fatalf("grpc.Dial() failed: %v", err)
193-
}
194-
t.Cleanup(func() { cc.Close() })
195-
196-
// Start a test backend which keeps a unary RPC call active by blocking on a
197-
// channel that is closed by the test later on. Also push an address update
198-
// via the resolver.
199-
blockCh := make(chan struct{})
200-
backend := &stubserver.StubServer{
201-
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
202-
<-blockCh
203-
return &testpb.Empty{}, nil
197+
tests := []struct {
198+
name string
199+
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
200+
}{
201+
{
202+
name: "unary",
203+
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
204+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
205+
return fmt.Errorf("EmptyCall RPC failed: %v", err)
206+
}
207+
return nil
208+
},
209+
},
210+
{
211+
name: "streaming",
212+
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
213+
stream, err := client.FullDuplexCall(ctx)
214+
if err != nil {
215+
t.Fatalf("FullDuplexCall RPC failed: %v", err)
216+
}
217+
if _, err := stream.Recv(); err != nil && err != io.EOF {
218+
t.Fatalf("stream.Recv() failed: %v", err)
219+
}
220+
return nil
221+
},
204222
},
205223
}
206-
if err := backend.StartServer(); err != nil {
207-
t.Fatalf("Failed to start backend: %v", err)
208-
}
209-
t.Cleanup(backend.Stop)
210-
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
211-
212-
// Verify that the ClientConn moves to READY.
213-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
214-
defer cancel()
215-
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
216224

217-
// Spawn a goroutine which checks expected state transitions and idleness
218-
// channelz trace events. It eventually closes `blockCh`, thereby unblocking
219-
// the server RPC handler and the unary call below.
220-
errCh := make(chan error, 1)
221-
go func() {
222-
defer close(blockCh)
223-
// Verify that the ClientConn stays in READY.
224-
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
225-
defer sCancel()
226-
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
227-
228-
// Verify that there are no idleness related channelz events.
229-
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
230-
errCh <- err
231-
return
232-
}
233-
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil {
234-
errCh <- err
235-
return
236-
}
225+
for _, test := range tests {
226+
t.Run(test.name, func(t *testing.T) {
227+
// Create a ClientConn with a short idle_timeout.
228+
r := manual.NewBuilderWithScheme("whatever")
229+
dopts := []grpc.DialOption{
230+
grpc.WithTransportCredentials(insecure.NewCredentials()),
231+
grpc.WithResolvers(r),
232+
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
233+
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
234+
}
235+
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
236+
if err != nil {
237+
t.Fatalf("grpc.Dial() failed: %v", err)
238+
}
239+
t.Cleanup(func() { cc.Close() })
240+
241+
// Start a test backend which keeps a unary RPC call active by blocking on a
242+
// channel that is closed by the test later on. Also push an address update
243+
// via the resolver.
244+
blockCh := make(chan struct{})
245+
backend := &stubserver.StubServer{
246+
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
247+
<-blockCh
248+
return &testpb.Empty{}, nil
249+
},
250+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
251+
<-blockCh
252+
return nil
253+
},
254+
}
255+
if err := backend.StartServer(); err != nil {
256+
t.Fatalf("Failed to start backend: %v", err)
257+
}
258+
t.Cleanup(backend.Stop)
259+
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
260+
261+
// Verify that the ClientConn moves to READY.
262+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
263+
defer cancel()
264+
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
265+
266+
// Spawn a goroutine which checks expected state transitions and idleness
267+
// channelz trace events.
268+
errCh := make(chan error, 1)
269+
go func() {
270+
defer close(blockCh)
271+
272+
// Verify that the ClientConn stays in READY.
273+
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
274+
defer sCancel()
275+
if cc.WaitForStateChange(sCtx, connectivity.Ready) {
276+
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
277+
return
278+
}
237279

238-
// Unblock the unary RPC on the server.
239-
errCh <- nil
240-
}()
280+
// Verify that there are no idleness related channelz events.
281+
//
282+
// TODO: Improve the checks here. If these log strings are
283+
// changed in the code, these checks will continue to pass.
284+
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil {
285+
errCh <- err
286+
return
287+
}
288+
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode")
289+
}()
241290

242-
// Make a unary RPC that blocks on the server, thereby ensuring that the
243-
// count of active RPCs on the client is non-zero.
244-
client := testgrpc.NewTestServiceClient(cc)
245-
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
246-
t.Errorf("EmptyCall RPC failed: %v", err)
247-
}
291+
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
292+
t.Fatalf("%s rpc failed: %v", test.name, err)
293+
}
248294

249-
select {
250-
case err := <-errCh:
251-
if err != nil {
252-
t.Fatal(err)
253-
}
254-
case <-ctx.Done():
255-
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
295+
select {
296+
case err := <-errCh:
297+
if err != nil {
298+
t.Fatal(err)
299+
}
300+
case <-ctx.Done():
301+
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
302+
}
303+
})
256304
}
257305
}
258306

stream.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,6 @@ type ClientStream interface {
158158
// If none of the above happen, a goroutine and a context will be leaked, and grpc
159159
// will not call the optionally-configured stats handler with a stats.End message.
160160
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
161-
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
162-
return nil, err
163-
}
164-
defer cc.idlenessMgr.OnCallEnd()
165-
166161
// allow interceptor to see all applicable call options, which means those
167162
// configured as defaults from dial option as well as per-call options
168163
opts = combine(cc.dopts.callOptions, opts)
@@ -179,6 +174,16 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
179174
}
180175

181176
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
177+
// Start tracking the RPC for idleness purposes. This is where a stream is
178+
// created for both streaming and unary RPCs, and hence is a good place to
179+
// track active RPC count.
180+
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
181+
return nil, err
182+
}
183+
// Add a calloption, to decrement the active call count, that gets executed
184+
// when the RPC completes.
185+
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
186+
182187
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
183188
// validate md
184189
if err := imetadata.Validate(md); err != nil {

0 commit comments

Comments
 (0)