Skip to content

Commit cb3dd79

Browse files
author
Achille
authored
0.4 broker resolver (#526)
* 0.4: kafka.BrokerResolver * add kafka.Transport.Context * inline network and address fields in conn type
1 parent fd5a288 commit cb3dd79

File tree

5 files changed

+196
-28
lines changed

5 files changed

+196
-28
lines changed

client_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ func newClient(addr net.Addr) (*Client, func()) {
6666
}
6767

6868
transport := &Transport{
69-
Dial: conns.Dial,
69+
Dial: conns.Dial,
70+
Resolver: NewBrokerResolver(nil),
7071
}
7172

7273
client := &Client{

dialer.go

-8
Original file line numberDiff line numberDiff line change
@@ -407,14 +407,6 @@ func LookupPartitions(ctx context.Context, network string, address string, topic
407407
return DefaultDialer.LookupPartitions(ctx, network, address, topic)
408408
}
409409

410-
// The Resolver interface is used as an abstraction to provide service discovery
411-
// of the hosts of a kafka cluster.
412-
type Resolver interface {
413-
// LookupHost looks up the given host using the local resolver.
414-
// It returns a slice of that host's addresses.
415-
LookupHost(ctx context.Context, host string) (addrs []string, err error)
416-
}
417-
418410
func sleep(ctx context.Context, duration time.Duration) bool {
419411
if duration == 0 {
420412
select {

resolver.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"net"
6+
)
7+
8+
// The Resolver interface is used as an abstraction to provide service discovery
9+
// of the hosts of a kafka cluster.
10+
type Resolver interface {
11+
// LookupHost looks up the given host using the local resolver.
12+
// It returns a slice of that host's addresses.
13+
LookupHost(ctx context.Context, host string) (addrs []string, err error)
14+
}
15+
16+
// BrokerResolver is an interface implemented by types that translate host
17+
// names into a network address.
18+
//
19+
// This resolver is not intended to be a general purpose interface. Instead,
20+
// it is tailored to the particular needs of the kafka protocol, with the goal
21+
// being to provide a flexible mechanism for extending broker name resolution
22+
// while retaining context that is specific to interacting with a kafka cluster.
23+
//
24+
// Resolvers must be safe to use from multiple goroutines.
25+
type BrokerResolver interface {
26+
// Returns the IP addresses of the broker passed as argument.
27+
LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error)
28+
}
29+
30+
// NewBrokerResolver constructs a Resolver from r.
31+
//
32+
// If r is nil, net.DefaultResolver is used instead.
33+
func NewBrokerResolver(r *net.Resolver) BrokerResolver {
34+
return brokerResolver{r}
35+
}
36+
37+
type brokerResolver struct {
38+
*net.Resolver
39+
}
40+
41+
func (r brokerResolver) LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error) {
42+
rslv := r.Resolver
43+
if rslv == nil {
44+
rslv = net.DefaultResolver
45+
}
46+
47+
ipAddrs, err := r.LookupIPAddr(ctx, broker.Host)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
if len(ipAddrs) == 0 {
53+
return nil, &net.DNSError{
54+
Err: "no addresses were returned by the resolver",
55+
Name: broker.Host,
56+
IsTemporary: true,
57+
IsNotFound: true,
58+
}
59+
}
60+
61+
return ipAddrs, nil
62+
}

transport.go

+131-18
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net"
1111
"runtime/pprof"
1212
"sort"
13+
"strconv"
1314
"strings"
1415
"sync"
1516
"sync/atomic"
@@ -93,6 +94,25 @@ type Transport struct {
9394
// SASL configures the Transfer to use SASL authentication.
9495
SASL sasl.Mechanism
9596

97+
// An optional resolver used to translate broker host names into network
98+
// addresses.
99+
//
100+
// The resolver will be called for every request (not every connection),
101+
// making it possible to implement ACL policies by validating that the
102+
// program is allowed to connect to the kafka broker. This also means that
103+
// the resolver should probably provide a caching layer to avoid storming
104+
// the service discovery backend with requests.
105+
//
106+
// When set, the Dial function is not responsible for performing name
107+
// resolution, and is always called with a pre-resolved address.
108+
Resolver BrokerResolver
109+
110+
// The background context used to control goroutines started internally by
111+
// the transport.
112+
//
113+
// If nil, context.Background() is used instead.
114+
Context context.Context
115+
96116
mutex sync.RWMutex
97117
pools map[networkAddress]*connPool
98118
}
@@ -210,7 +230,7 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
210230
return p
211231
}
212232

213-
ctx, cancel := context.WithCancel(context.Background())
233+
ctx, cancel := context.WithCancel(t.context())
214234

215235
p = &connPool{
216236
refc: 2,
@@ -222,6 +242,7 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
222242
clientID: t.ClientID,
223243
tls: t.TLS,
224244
sasl: t.SASL,
245+
resolver: t.Resolver,
225246

226247
ready: make(event),
227248
wake: make(chan event),
@@ -239,6 +260,13 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
239260
return p
240261
}
241262

263+
func (t *Transport) context() context.Context {
264+
if t.Context != nil {
265+
return t.Context
266+
}
267+
return context.Background()
268+
}
269+
242270
type event chan struct{}
243271

244272
func (e event) trigger() { close(e) }
@@ -255,6 +283,7 @@ type connPool struct {
255283
clientID string
256284
tls *tls.Config
257285
sasl sasl.Mechanism
286+
resolver BrokerResolver
258287
// Signaling mechanisms to orchestrate communications between the pool and
259288
// the rest of the program.
260289
once sync.Once // ensure that `ready` is triggered only once
@@ -491,9 +520,11 @@ func (p *connPool) update(ctx context.Context, metadata *meta.Response, err erro
491520

492521
for id := range addBrokers {
493522
broker := layout.Brokers[id]
494-
p.conns[id] = p.newConnGroup(&networkAddress{
495-
network: "tcp",
496-
address: broker.String(),
523+
p.conns[id] = p.newBrokerConnGroup(Broker{
524+
Rack: broker.Rack,
525+
Host: broker.Host,
526+
Port: broker.Port,
527+
ID: broker.ID,
497528
})
498529
}
499530
}
@@ -559,12 +590,12 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) {
559590
// returned.
560591
func (p *connPool) grabBrokerConn(ctx context.Context, brokerID int) (*conn, error) {
561592
p.mutex.RLock()
562-
c := p.conns[brokerID]
593+
g := p.conns[brokerID]
563594
p.mutex.RUnlock()
564-
if c == nil {
595+
if g == nil {
565596
return nil, BrokerNotAvailable
566597
}
567-
return c.grabConnOrConnect(ctx)
598+
return g.grabConnOrConnect(ctx)
568599
}
569600

570601
// grabClusterConn returns the connection to the kafka cluster that the pool is
@@ -754,6 +785,20 @@ func (p *connPool) newConnGroup(a net.Addr) *connGroup {
754785
return &connGroup{
755786
addr: a,
756787
pool: p,
788+
broker: Broker{
789+
ID: -1,
790+
},
791+
}
792+
}
793+
794+
func (p *connPool) newBrokerConnGroup(broker Broker) *connGroup {
795+
return &connGroup{
796+
addr: &networkAddress{
797+
network: "tcp",
798+
address: net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)),
799+
},
800+
pool: p,
801+
broker: broker,
757802
}
758803
}
759804

@@ -849,7 +894,8 @@ var defaultDialer = net.Dialer{
849894
// actual network connections are lazily open before sending requests, and
850895
// closed if they are unused for longer than the idle timeout.
851896
type connGroup struct {
852-
addr net.Addr
897+
addr net.Addr
898+
broker Broker
853899
// Immutable state of the connection.
854900
pool *connPool
855901
// Shared state of the connection, this is synchronized on the mutex through
@@ -873,14 +919,50 @@ func (g *connGroup) closeIdleConns() {
873919
}
874920

875921
func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
876-
c := g.grabConn()
922+
var rslv = g.pool.resolver
923+
var addr = g.addr
924+
var c *conn
925+
926+
if rslv == nil {
927+
c = g.grabConn()
928+
} else {
929+
var err error
930+
var broker = g.broker
931+
932+
if broker.ID < 0 {
933+
host, port, err := net.SplitHostPort(addr.String())
934+
if err != nil {
935+
return nil, fmt.Errorf("%s: %w", addr, err)
936+
}
937+
portNumber, err := strconv.Atoi(port)
938+
if err != nil {
939+
return nil, fmt.Errorf("%s: %w", addr, err)
940+
}
941+
broker.Host = host
942+
broker.Port = portNumber
943+
}
944+
945+
ipAddrs, err := rslv.LookupBrokerIPAddr(ctx, broker)
946+
if err != nil {
947+
return nil, err
948+
}
949+
950+
for _, ipAddr := range ipAddrs {
951+
network := addr.Network()
952+
address := net.JoinHostPort(ipAddr.String(), strconv.Itoa(broker.Port))
953+
954+
if c = g.grabConnTo(network, address); c != nil {
955+
break
956+
}
957+
}
958+
}
877959

878960
if c == nil {
879961
connChan := make(chan *conn)
880962
errChan := make(chan error)
881963

882964
go func() {
883-
c, err := g.connect(ctx)
965+
c, err := g.connect(ctx, addr)
884966
if err != nil {
885967
select {
886968
case errChan <- err:
@@ -909,6 +991,30 @@ func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
909991
return c, nil
910992
}
911993

994+
func (g *connGroup) grabConnTo(network, address string) *conn {
995+
g.mutex.Lock()
996+
defer g.mutex.Unlock()
997+
998+
for i := len(g.idleConns) - 1; i >= 0; i-- {
999+
c := g.idleConns[i]
1000+
1001+
if c.network == network && c.address == address {
1002+
copy(g.idleConns[i:], g.idleConns[i+1:])
1003+
n := len(g.idleConns) - 1
1004+
g.idleConns[n] = nil
1005+
g.idleConns = g.idleConns[:n]
1006+
1007+
if c.timer != nil {
1008+
c.timer.Stop()
1009+
}
1010+
1011+
return c
1012+
}
1013+
}
1014+
1015+
return nil
1016+
}
1017+
9121018
func (g *connGroup) grabConn() *conn {
9131019
g.mutex.Lock()
9141020
defer g.mutex.Unlock()
@@ -974,14 +1080,14 @@ func (g *connGroup) releaseConn(c *conn) bool {
9741080
return true
9751081
}
9761082

977-
func (g *connGroup) connect(ctx context.Context) (*conn, error) {
1083+
func (g *connGroup) connect(ctx context.Context, addr net.Addr) (*conn, error) {
9781084
deadline := time.Now().Add(g.pool.dialTimeout)
9791085

9801086
ctx, cancel := context.WithDeadline(ctx, deadline)
9811087
defer cancel()
9821088

983-
var network = strings.Split(g.addr.Network(), ",")
984-
var address = strings.Split(g.addr.String(), ",")
1089+
var network = strings.Split(addr.Network(), ",")
1090+
var address = strings.Split(addr.String(), ",")
9851091
var netConn net.Conn
9861092
var netAddr net.Addr
9871093
var err error
@@ -1055,18 +1161,25 @@ func (g *connGroup) connect(ctx context.Context) (*conn, error) {
10551161
pc.SetDeadline(time.Time{})
10561162

10571163
reqs := make(chan connRequest)
1058-
c := &conn{reqs: reqs, group: g}
1164+
c := &conn{
1165+
network: netAddr.Network(),
1166+
address: netAddr.String(),
1167+
reqs: reqs,
1168+
group: g,
1169+
}
10591170
go c.run(pc, reqs)
10601171

10611172
netConn = nil
10621173
return c, nil
10631174
}
10641175

10651176
type conn struct {
1066-
reqs chan<- connRequest
1067-
once sync.Once
1068-
group *connGroup
1069-
timer *time.Timer
1177+
reqs chan<- connRequest
1178+
network string
1179+
address string
1180+
once sync.Once
1181+
group *connGroup
1182+
timer *time.Timer
10701183
}
10711184

10721185
func (c *conn) close() {

transport_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestIssue477(t *testing.T) {
2424
},
2525
}
2626

27-
if _, err := cg.connect(context.Background()); err != nil {
27+
if _, err := cg.connect(context.Background(), cg.addr); err != nil {
2828
// An error is expected here because we are not actually establishing
2929
// a TLS connection to a kafka broker.
3030
t.Log(err)

0 commit comments

Comments
 (0)