File tree 2 files changed +20
-2
lines changed
2 files changed +20
-2
lines changed Original file line number Diff line number Diff line change @@ -84,6 +84,10 @@ type RawExchanger interface {
84
84
RawExchange (rw io.ReadWriter ) (Message , error )
85
85
}
86
86
87
+ type DependentMessage interface {
88
+ DependsOnVersion () ApiKey
89
+ }
90
+
87
91
func (c * Conn ) RoundTrip (msg Message ) (Message , error ) {
88
92
correlationID := atomic .AddInt32 (& c .idgen , + 1 )
89
93
versions , _ := c .versions .Load ().(map [ApiKey ]int16 )
@@ -93,7 +97,14 @@ func (c *Conn) RoundTrip(msg Message) (Message, error) {
93
97
p .Prepare (apiVersion )
94
98
}
95
99
96
- if raw , ok := msg .(RawExchanger ); ok && raw .Required (int (apiVersion )) {
100
+ dp , isDependent := msg .(DependentMessage )
101
+
102
+ requiredVersion := apiVersion
103
+ if isDependent {
104
+ requiredVersion = versions [dp .DependsOnVersion ()]
105
+ }
106
+
107
+ if raw , ok := msg .(RawExchanger ); ok && raw .Required (int (requiredVersion )) {
97
108
return raw .RawExchange (c )
98
109
}
99
110
Original file line number Diff line number Diff line change @@ -15,6 +15,10 @@ type Request struct {
15
15
AuthBytes []byte `kafka:"min=v0,max=v1"`
16
16
}
17
17
18
+ func (* Request ) DependsOnVersion () protocol.ApiKey {
19
+ return protocol .SaslHandshake
20
+ }
21
+
18
22
func (r * Request ) RawExchange (rw io.ReadWriter ) (protocol.Message , error ) {
19
23
if err := r .writeTo (rw ); err != nil {
20
24
return nil , err
@@ -63,4 +67,7 @@ type Response struct {
63
67
64
68
func (r * Response ) ApiKey () protocol.ApiKey { return protocol .SaslAuthenticate }
65
69
66
- var _ protocol.RawExchanger = (* Request )(nil )
70
+ var (
71
+ _ protocol.RawExchanger = (* Request )(nil )
72
+ _ protocol.DependentMessage = (* Request )(nil )
73
+ )
You can’t perform that action at this time.
0 commit comments