Skip to content

Commit 91437d1

Browse files
committed
core/connpool: tcp keepalive & max ttl
1 parent 6aaafd7 commit 91437d1

File tree

6 files changed

+100
-52
lines changed

6 files changed

+100
-52
lines changed

intra/core/connpool.go

+46-21
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ const poolcapacity = 8 // default capacity
2626
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
2727
const Nobody = uintptr(0) // nobody
2828
const scrubinterval = 5 * time.Minute // interval between subsequent scrubs
29+
const maxttl = 8 * time.Minute // close unused pooled conns after this period
2930

31+
// go.dev/play/p/ig2Zpk-LTSv
32+
var (
33+
kaidle = int(maxttl / 5 / time.Second) // 8m / 5 => 96s
34+
kainterval = int(maxttl / 10 / time.Second) // 8m / 10 => 48s
35+
)
3036
var errUnexpectedRead error = errors.New("pool: unexpected read")
3137

3238
type superpool[T comparable] struct {
@@ -124,19 +130,24 @@ func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool {
124130
return super.pool.Put(conn)
125131
}
126132

133+
type timedconn struct {
134+
c net.Conn
135+
dob time.Time
136+
}
137+
127138
// github.com/redis/go-redis/blob/d9eeed13/internal/pool/pool.go
128139
type ConnPool[T comparable] struct {
129140
ctx context.Context
130141
id T
131-
p chan net.Conn // never closed
142+
p chan timedconn // never closed
132143
closed atomic.Bool
133144
}
134145

135146
func NewConnPool[T comparable](ctx context.Context, id T) *ConnPool[T] {
136147
c := &ConnPool[T]{
137148
ctx: ctx,
138149
id: id,
139-
p: make(chan net.Conn, poolcapacity),
150+
p: make(chan timedconn, poolcapacity),
140151
}
141152

142153
context.AfterFunc(ctx, c.clean)
@@ -157,13 +168,13 @@ func (c *ConnPool[T]) Get() (zz net.Conn) {
157168
for i < maxattempts {
158169
i++
159170
select {
160-
case conn := <-c.p:
161-
if readable(conn) {
162-
// reset previous timeout
163-
_ = conn.SetDeadline(time.Time{})
164-
return conn
171+
case tconn := <-c.p:
172+
// if readable, return conn regardless of its freshness
173+
if readable(tconn.c) {
174+
nokeepalive(tconn.c)
175+
return tconn.c
165176
}
166-
clos(conn)
177+
CloseConn(tconn.c)
167178
case <-ctx.Done():
168179
return // signal stop
169180
default:
@@ -189,8 +200,11 @@ func (c *ConnPool[T]) Put(conn net.Conn) (ok bool) {
189200
return
190201
}
191202

203+
tconn := timedconn{conn, time.Now()}
192204
select {
193-
case c.p <- conn:
205+
case c.p <- tconn:
206+
cleardeadline(conn) // reset any previous timeout
207+
keepalive(conn)
194208
return true
195209
case <-c.ctx.Done(): // stop
196210
return false
@@ -214,8 +228,8 @@ func (c *ConnPool[T]) clean() {
214228
log.I("pool: %v closed? %t", c.id, ok)
215229
for {
216230
select {
217-
case conn := <-c.p:
218-
clos(conn)
231+
case tconn := <-c.p:
232+
CloseConn(tconn.c)
219233
default:
220234
return
221235
}
@@ -229,18 +243,18 @@ func (c *ConnPool[T]) scrub() {
229243
}
230244

231245
select {
232-
case conn := <-c.p:
233-
if readable(conn) {
246+
case tconn := <-c.p:
247+
if fresh(tconn.dob) && readable(tconn.c) {
234248
select {
235-
case c.p <- conn:
249+
case c.p <- tconn: // update dob only on Put()
236250
case <-c.ctx.Done(): // stop
237-
clos(conn)
251+
CloseConn(tconn.c)
238252
return
239253
default: // full
240-
clos(conn)
254+
CloseConn(tconn.c)
241255
}
242256
} else {
243-
clos(conn)
257+
CloseConn(tconn.c)
244258
}
245259
case <-c.ctx.Done():
246260
return
@@ -250,6 +264,10 @@ func (c *ConnPool[T]) scrub() {
250264
}
251265
}
252266

267+
func fresh(t time.Time) bool {
268+
return time.Since(t) < maxttl
269+
}
270+
253271
// github.com/golang/go/issues/15735
254272
func readable(c net.Conn) bool {
255273
var err error
@@ -264,10 +282,6 @@ func readable(c net.Conn) bool {
264282
return err == nil
265283
}
266284

267-
func clos(c net.Conn) {
268-
CloseConn(c)
269-
}
270-
271285
// github.com/go-sql-driver/mysql/blob/f20b28636/conncheck.go
272286
// github.com/redis/go-redis/blob/cc9bcb0c0/internal/pool/conn_check.go
273287
func canread(sc syscall.Conn) error {
@@ -317,6 +331,17 @@ func canread(sc syscall.Conn) error {
317331
return errors.Join(ctlErr, checkErr) // may return nil
318332
}
319333

334+
func keepalive(c net.Conn) bool {
335+
return SetKeepAliveConfigSockOpt(c, kaidle, kainterval)
336+
}
337+
338+
func nokeepalive(c net.Conn) bool {
339+
if tc, ok := c.(*net.TCPConn); ok {
340+
return tc.SetKeepAlive(false) == nil
341+
}
342+
return false
343+
}
344+
320345
func logev(err error) log.LogFn {
321346
return logevif(err != nil)
322347
}

intra/core/ping.go

+6
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ func extend(c MinConn) {
180180
}
181181
}
182182

183+
func cleardeadline(c MinConn) {
184+
if c != nil {
185+
_ = c.SetDeadline(time.Time{})
186+
}
187+
}
188+
183189
func payload() (t []byte, tslen int, err error) {
184190
randomPayload := make([]byte, 16)
185191
_, err = rand.Read(randomPayload[:])

intra/protect/sockopt.go renamed to intra/core/sockopt.go

+36-24
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
// License, v. 2.0. If a copy of the MPL was not distributed with this
55
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
66

7-
package protect
7+
package core
88

99
import (
1010
"net"
1111
"syscall"
1212

1313
"github.com/celzero/firestack/intra/log"
14-
"github.com/celzero/firestack/intra/settings"
1514
"golang.org/x/sys/unix"
1615
)
1716

@@ -34,59 +33,72 @@ var (
3433
}
3534
)
3635

37-
func SetKeepAliveConfig(c Conn) bool {
38-
if !settings.GetDialerOpts().LowerKeepAlive {
39-
return false
40-
}
41-
36+
func SetKeepAliveConfig(c MinConn) bool {
4237
if tc, ok := c.(*net.TCPConn); ok {
4338
return tc.SetKeepAliveConfig(kacfg) == nil
4439
}
4540
return false
4641
}
4742

48-
func SetKeepAliveConfigSockOpt(c Conn) bool {
49-
if !settings.GetDialerOpts().LowerKeepAlive {
50-
return false
51-
}
43+
// SetKeepAliveConfigSockOpt sets for a TCP connection, SO_KEEPALIVE,
44+
// TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT, TCP_USER_TIMEOUT.
45+
// args is optional, and should be in the order of idle, interval, count.
46+
func SetKeepAliveConfigSockOpt(c MinConn, args ...int) (ok bool) {
47+
var tc *net.TCPConn
48+
if tc, ok = c.(*net.TCPConn); ok {
49+
id := conn2str(tc)
5250

53-
if tc, ok := c.(*net.TCPConn); ok {
5451
rawConn, err := tc.SyscallConn()
5552
if err != nil || rawConn == nil {
5653
ok = false
5754
return ok
5855
}
56+
57+
idle := defaultIdle // secs
58+
interval := defaultInterval // secs
59+
count := defaultCount
60+
if len(args) >= 1 && args[0] > 0 {
61+
idle = args[0]
62+
}
63+
if len(args) >= 2 && args[1] > 0 {
64+
interval = args[1]
65+
}
66+
if len(args) >= 3 && args[2] > 0 {
67+
count = args[2]
68+
}
69+
usertimeoutms := idle*1000 + (interval * count) // millis
70+
71+
ok = true
5972
err = rawConn.Control(func(fd uintptr) {
6073
sock := int(fd)
6174
if err := syscall.SetsockoptInt(sock, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(true)); err != nil {
62-
log.D("set SO_KEEPALIVE failed: %v", err)
75+
log.D("set SO_KEEPALIVE %s failed: %v", id, err)
6376
ok = false
6477
}
65-
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, defaultIdle); err != nil {
66-
log.D("set TCP_KEEPIDLE failed: %v", err)
78+
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, idle); err != nil {
79+
log.D("set TCP_KEEPIDLE %s failed: %ds, %v", id, idle, err)
6780
ok = false
6881
}
69-
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, defaultInterval); err != nil {
70-
log.D("set TCP_KEEPINTVL failed: %v", err)
82+
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, interval); err != nil {
83+
log.D("set TCP_KEEPINTVL %s failed: %ds, %v", id, interval, err)
7184
ok = false
7285
}
73-
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, defaultCount); err != nil {
74-
log.D("set TCP_KEEPCNT failed: %v", err)
86+
if err := syscall.SetsockoptInt(sock, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, count); err != nil {
87+
log.D("set TCP_KEEPCNT %s failed: #%d, %v", id, count, err)
7588
ok = false
7689
}
7790
// code.googlesource.com/google-api-go-client/+/master/transport/grpc/dial_socketopt.go#30
78-
if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usrTimeoutMillis); err != nil {
79-
log.D("set TCP_USER_TIMEOUT failed: %v", err)
91+
if err := unix.SetsockoptInt(sock, unix.SOL_TCP, unix.TCP_USER_TIMEOUT, usertimeoutms); err != nil {
92+
log.D("set TCP_USER_TIMEOUT %s failed: %dms, %v", id, usertimeoutms, err)
8093
ok = false
8194
}
8295
})
8396
if err != nil {
84-
log.E("RawConn.Control() failed: %v", err)
97+
log.E("dialers: sockopt: %s RawConn.Control() err: %v", id, err)
8598
ok = false
8699
}
87-
return ok
88100
}
89-
return false
101+
return ok
90102
}
91103

92104
func boolint(b bool) int {

intra/ipn/auto.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ package ipn
99
import (
1010
"context"
1111
"math/rand"
12+
"net"
1213
"net/netip"
1314
"time"
1415

1516
x "github.com/celzero/firestack/intra/backend"
1617
"github.com/celzero/firestack/intra/core"
1718
"github.com/celzero/firestack/intra/log"
1819
"github.com/celzero/firestack/intra/protect"
20+
"github.com/celzero/firestack/intra/settings"
1921
)
2022

2123
var (
@@ -114,8 +116,7 @@ func (h *auto) Dial(network, addr string) (protect.Conn, error) {
114116
h.exp.K(addr, who, ttl30s)
115117
h.status.Store(TOK)
116118
}
117-
// adjust TCP keepalive config if c is a TCPConn
118-
protect.SetKeepAliveConfigSockOpt(c)
119+
maybeKeepAlive(c)
119120
log.I("proxy: auto: w(%d) pin(%t/%d), dial(%s) %s; err? %v",
120121
who, recent, previdx, network, addr, err)
121122
return c, err
@@ -263,3 +264,10 @@ func ip4to6(prefix96 netip.Prefix, ip4 netip.Addr) netip.Addr {
263264
}
264265
return netip.AddrFrom16(s6)
265266
}
267+
268+
func maybeKeepAlive(c net.Conn) {
269+
if settings.GetDialerOpts().LowerKeepAlive {
270+
// adjust TCP keepalive config if c is a TCPConn
271+
core.SetKeepAliveConfigSockOpt(c)
272+
}
273+
}

intra/ipn/base.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,7 @@ func (h *base) Dial(network, addr string) (c protect.Conn, err error) {
5555
}
5656
defer localDialStatus(h.status, err)
5757

58-
//Adjust TCP keepalive config if c is a TCPConn
59-
protect.SetKeepAliveConfigSockOpt(c)
60-
58+
maybeKeepAlive(c)
6159
log.I("proxy: base: dial(%s) to %s; err? %v", network, addr, err)
6260
return
6361
}

intra/ipn/exit.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ func (h *exit) Dial(network, addr string) (protect.Conn, error) {
4949
// exit always splits
5050
c, err := localDialStrat(h.outbound, network, addr)
5151
defer localDialStatus(h.status, err)
52-
// adjust TCP keepalive config if c is a TCPConn
53-
protect.SetKeepAliveConfigSockOpt(c)
52+
maybeKeepAlive(c)
5453
log.I("proxy: exit: dial(%s) to %s; err? %v", network, addr, err)
5554
return c, err
5655
}

0 commit comments

Comments
 (0)