Skip to content

Commit b5e5436

Browse files
authored
Merge pull request #343 from kinvolk/rata/misc-small-fixes
Simple konn-client cleanups
2 parents 4271084 + 9f2918c commit b5e5436

File tree

1 file changed

+40
-39
lines changed

1 file changed

+40
-39
lines changed

konnectivity-client/pkg/client/client.go

+40-39
Original file line numberDiff line numberDiff line change
@@ -224,25 +224,25 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
224224
// In either scenario, we should return here and close the tunnel as it is no longer needed.
225225
klog.V(1).InfoS("DialResp not recognized; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
226226
return
227-
} else {
228-
result := dialResult{connid: resp.ConnectID}
229-
if resp.Error != "" {
230-
result.err = &dialFailure{resp.Error, DialFailureEndpoint}
231-
}
232-
select {
233-
// try to send to the result channel
234-
case pendingDial.resultCh <- result:
235-
// unblock if the cancel channel is closed
236-
case <-pendingDial.cancelCh:
237-
// Note: this condition can only be hit by a race condition where the
238-
// DialContext() returns early (timeout) after the pendingDial is already
239-
// fetched here, but before the result is sent.
240-
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
241-
return
242-
case <-tunnelCtx.Done():
243-
klog.V(1).InfoS("Tunnel has been closed; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
244-
return
245-
}
227+
}
228+
229+
result := dialResult{connid: resp.ConnectID}
230+
if resp.Error != "" {
231+
result.err = &dialFailure{resp.Error, DialFailureEndpoint}
232+
}
233+
select {
234+
// try to send to the result channel
235+
case pendingDial.resultCh <- result:
236+
// unblock if the cancel channel is closed
237+
case <-pendingDial.cancelCh:
238+
// Note: this condition can only be hit by a race condition where the
239+
// DialContext() returns early (timeout) after the pendingDial is already
240+
// fetched here, but before the result is sent.
241+
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
242+
return
243+
case <-tunnelCtx.Done():
244+
klog.V(1).InfoS("Tunnel has been closed; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
245+
return
246246
}
247247

248248
if resp.Error != "" {
@@ -281,33 +281,34 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
281281
// TODO: flow control
282282
conn, ok := t.conns.get(resp.ConnectID)
283283

284-
if ok {
285-
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
286-
select {
287-
case conn.readCh <- resp.Data:
288-
timer.Stop()
289-
case <-timer.C:
290-
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
291-
return
292-
case <-tunnelCtx.Done():
293-
klog.V(1).InfoS("Tunnel has been closed, the grpc connection to the proxy server will be closed", "connectionID", conn.connID)
294-
}
295-
} else {
296-
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
284+
if !ok {
285+
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
286+
continue
287+
}
288+
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
289+
select {
290+
case conn.readCh <- resp.Data:
291+
timer.Stop()
292+
case <-timer.C:
293+
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
294+
return
295+
case <-tunnelCtx.Done():
296+
klog.V(1).InfoS("Tunnel has been closed, the grpc connection to the proxy server will be closed", "connectionID", conn.connID)
297297
}
298298

299299
case client.PacketType_CLOSE_RSP:
300300
resp := pkt.GetCloseResponse()
301301
conn, ok := t.conns.get(resp.ConnectID)
302302

303-
if ok {
304-
close(conn.readCh)
305-
conn.closeCh <- resp.Error
306-
close(conn.closeCh)
307-
t.conns.remove(resp.ConnectID)
308-
return
303+
if !ok {
304+
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
305+
continue
309306
}
310-
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
307+
close(conn.readCh)
308+
conn.closeCh <- resp.Error
309+
close(conn.closeCh)
310+
t.conns.remove(resp.ConnectID)
311+
return
311312
}
312313
}
313314
}

0 commit comments

Comments
 (0)