Skip to content

Commit 95b079a

Browse files
committed
core: impl generic ConnPool
1 parent de0b86c commit 95b079a

File tree

1 file changed

+309
-0
lines changed

1 file changed

+309
-0
lines changed

intra/core/connpool.go

+309
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
// Copyright (c) 2024 RethinkDNS and its authors.
2+
//
3+
// This Source Code Form is subject to the terms of the Mozilla Public
4+
// License, v. 2.0. If a copy of the MPL was not distributed with this
5+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
7+
package core
8+
9+
import (
10+
"context"
11+
"errors"
12+
"fmt"
13+
"io"
14+
"net"
15+
"sync"
16+
"sync/atomic"
17+
"syscall"
18+
"time"
19+
20+
"github.com/celzero/firestack/intra/log"
21+
"golang.org/x/sys/unix"
22+
)
23+
24+
const useread = false // always false; here for doc purposes
25+
const poolcapacity = 8 // default capacity
26+
const maxattempts = poolcapacity / 2 // max attempts to retrieve a conn from pool
27+
const Nobody = uintptr(0) // nobody
28+
29+
var errUnexpectedRead error = errors.New("pool: unexpected read")
30+
31+
type superpool[T comparable] struct {
32+
quit context.CancelFunc
33+
pool *ConnPool[T]
34+
}
35+
36+
type MultConnPool[T comparable] struct {
37+
ctx context.Context
38+
mu sync.RWMutex
39+
m map[T]*superpool[T]
40+
}
41+
42+
func NewMultConnPool[T comparable](ctx context.Context) *MultConnPool[T] {
43+
mc := &MultConnPool[T]{
44+
ctx: ctx,
45+
m: make(map[T]*superpool[T]),
46+
}
47+
every10m := time.NewTicker(10 * time.Minute)
48+
go mc.scrub(ctx, every10m)
49+
return mc
50+
}
51+
52+
func (m *MultConnPool[T]) scrub(ctx context.Context, tick *time.Ticker) {
53+
defer tick.Stop()
54+
for {
55+
select {
56+
case <-ctx.Done():
57+
return
58+
case <-tick.C:
59+
m.mu.Lock()
60+
for id, super := range m.m {
61+
if !super.pool.closed.Load() {
62+
delete(m.m, id)
63+
} else if super.pool.empty() {
64+
super.quit()
65+
delete(m.m, id)
66+
} else {
67+
go super.pool.scrub()
68+
}
69+
}
70+
m.mu.Unlock()
71+
}
72+
}
73+
}
74+
75+
func (m *MultConnPool[T]) Get(id T) net.Conn {
76+
if IsZero(id) {
77+
return nil
78+
}
79+
80+
m.mu.RLock()
81+
super := m.m[id]
82+
m.mu.RUnlock()
83+
84+
if super != nil {
85+
return super.pool.Get()
86+
}
87+
return nil
88+
}
89+
90+
func (m *MultConnPool[T]) Put(id T, conn net.Conn) bool {
91+
if IsZero(id) || IsNil(conn) {
92+
return false
93+
}
94+
95+
m.mu.RLock() // read lock
96+
super := m.m[id]
97+
m.mu.RUnlock()
98+
99+
if super == nil {
100+
m.mu.Lock() // double check with write lock
101+
if super = m.m[id]; super == nil {
102+
child, sigstop := context.WithCancel(m.ctx)
103+
super = &superpool[T]{sigstop, NewConnPool(child, id)}
104+
m.m[id] = super
105+
}
106+
m.mu.Unlock()
107+
}
108+
109+
return super.pool.Put(conn)
110+
}
111+
112+
// github.com/redis/go-redis/blob/d9eeed13/internal/pool/pool.go
113+
type ConnPool[T comparable] struct {
114+
ctx context.Context
115+
id T
116+
p chan net.Conn // never closed
117+
closed atomic.Bool
118+
}
119+
120+
func NewConnPool[T comparable](ctx context.Context, id T) *ConnPool[T] {
121+
c := &ConnPool[T]{
122+
ctx: ctx,
123+
id: id,
124+
p: make(chan net.Conn, poolcapacity),
125+
}
126+
127+
context.AfterFunc(ctx, c.clean)
128+
return c
129+
}
130+
131+
func (c *ConnPool[T]) Get() (zz net.Conn) {
132+
if c.closed.Load() {
133+
return
134+
}
135+
136+
if len(c.p) == 0 {
137+
return
138+
}
139+
140+
pooled, complete := Grx("pool.get", func(ctx context.Context) (zz net.Conn) {
141+
i := 0
142+
for i < maxattempts {
143+
i++
144+
select {
145+
case conn := <-c.p:
146+
if readable(conn) {
147+
// reset previous timeout
148+
_ = conn.SetDeadline(time.Time{})
149+
return conn
150+
}
151+
clos(conn)
152+
case <-ctx.Done():
153+
return // signal stop
154+
default:
155+
return // empty
156+
}
157+
}
158+
return // maxattempts exceeded
159+
}, timeout)
160+
161+
empty := IsNil(pooled) // or maxattempts exceeded
162+
timedout := !complete
163+
logevif(timedout || empty)("pool: %v get: empty? %t, timedout? %t",
164+
c.id, empty, timedout)
165+
166+
return pooled
167+
}
168+
169+
func (c *ConnPool[T]) Put(conn net.Conn) (ok bool) {
170+
if c.closed.Load() {
171+
return
172+
}
173+
if c.full() {
174+
return
175+
}
176+
177+
select {
178+
case c.p <- conn:
179+
return true
180+
case <-c.ctx.Done(): // stop
181+
return false
182+
default: // pool full
183+
return false
184+
}
185+
}
186+
187+
func (c *ConnPool[T]) empty() bool {
188+
return len(c.p) == 0
189+
}
190+
191+
func (c *ConnPool[T]) full() bool {
192+
return len(c.p) >= poolcapacity
193+
}
194+
195+
func (c *ConnPool[T]) clean() {
196+
// defer close(c.p)
197+
198+
ok := c.closed.CompareAndSwap(false, true)
199+
log.I("pool: %v closed? %t", c.id, ok)
200+
for {
201+
select {
202+
case conn := <-c.p:
203+
clos(conn)
204+
default:
205+
return
206+
}
207+
}
208+
}
209+
210+
func (c *ConnPool[T]) scrub() {
211+
if c.closed.Load() {
212+
return
213+
}
214+
for {
215+
select {
216+
case conn := <-c.p:
217+
if readable(conn) {
218+
select {
219+
case c.p <- conn:
220+
case <-c.ctx.Done(): // stop
221+
clos(conn)
222+
return
223+
default: // full
224+
clos(conn)
225+
}
226+
} else {
227+
clos(conn)
228+
}
229+
case <-c.ctx.Done():
230+
default:
231+
return
232+
}
233+
}
234+
}
235+
236+
func readable(c net.Conn) bool {
237+
var err error
238+
id := conn2str(c)
239+
// must use syscall.Conn: github.com/golang/go/issues/65143
240+
switch x := c.(type) {
241+
case syscall.Conn:
242+
err = canread(x)
243+
default:
244+
}
245+
logev(err)("pool: %s readable? %t; err? %v", id, err == nil, err)
246+
return err == nil
247+
}
248+
249+
func clos(c net.Conn) {
250+
CloseConn(c)
251+
}
252+
253+
// github.com/go-sql-driver/mysql/blob/f20b28636/conncheck.go
254+
// github.com/redis/go-redis/blob/cc9bcb0c0/internal/pool/conn_check.go
255+
func canread(sc syscall.Conn) error {
256+
var checkErr error
257+
var ctlErr error
258+
259+
raw, err := sc.SyscallConn()
260+
if err != nil {
261+
return fmt.Errorf("pool: sysconn: %w", err)
262+
}
263+
264+
if useread {
265+
ctlErr = raw.Read(func(fd uintptr) bool {
266+
// pitfall: github.com/redis/go-redis/issues/3137
267+
var buf [1]byte
268+
n, err := syscall.Read(int(fd), buf[:])
269+
switch {
270+
case n == 0 && err == nil:
271+
checkErr = io.EOF
272+
case n > 0:
273+
// conn is supposed to be idle
274+
checkErr = errUnexpectedRead
275+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
276+
checkErr = nil
277+
default:
278+
checkErr = err
279+
}
280+
return true
281+
})
282+
} else {
283+
ctlErr = raw.Control(func(fd uintptr) {
284+
fds := []unix.PollFd{
285+
{Fd: int32(fd), Events: unix.POLLIN | unix.POLLERR},
286+
}
287+
n, err := unix.Poll(fds, 0)
288+
if err != nil {
289+
checkErr = fmt.Errorf("pool: poll: err: %v", err)
290+
}
291+
if n > 0 {
292+
checkErr = fmt.Errorf("pool: poll: sz: %d (must be 0), errno: %v",
293+
n, fds[0].Revents)
294+
}
295+
})
296+
}
297+
return errors.Join(ctlErr, checkErr) // may return nil
298+
}
299+
300+
func logev(err error) log.LogFn {
301+
return logevif(err != nil)
302+
}
303+
304+
func logevif(e bool) log.LogFn {
305+
if e {
306+
return log.E
307+
}
308+
return log.D
309+
}

0 commit comments

Comments
 (0)