@@ -2,14 +2,19 @@ package sinks
2
2
3
3
import (
4
4
"context"
5
+ "crypto/sha256"
6
+ "crypto/sha512"
5
7
"crypto/tls"
6
8
"crypto/x509"
7
9
"encoding/json"
10
+ "fmt"
8
11
"os"
9
12
10
13
"github.com/Shopify/sarama"
11
14
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
12
15
"github.com/rs/zerolog/log"
16
+
17
+ "github.com/xdg-go/scram"
13
18
)
14
19
15
20
// KafkaConfig is the Kafka producer configuration
@@ -28,9 +33,10 @@ type KafkaConfig struct {
28
33
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
29
34
} `yaml:"tls"`
30
35
SASL struct {
31
- Enable bool `yaml:"enable"`
32
- Username string `yaml:"username"`
33
- Password string `yaml:"password"`
36
+ Enable bool `yaml:"enable"`
37
+ Username string `yaml:"username"`
38
+ Password string `yaml:"password"`
39
+ Mechanism string `yaml:"mechanism" default:"plain"`
34
40
} `yaml:"sasl"`
35
41
KafkaEncode Avro `yaml:"avro"`
36
42
}
@@ -151,7 +157,7 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) {
151
157
152
158
caCert , err := os .ReadFile (cfg .TLS .CaFile )
153
159
if err != nil {
154
- return nil , err
160
+ return nil , fmt . Errorf ( "error loading ca file: %w" , err )
155
161
}
156
162
157
163
caCertPool := x509 .NewCertPool ()
@@ -178,6 +184,17 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) {
178
184
saramaConfig .Net .SASL .Enable = true
179
185
saramaConfig .Net .SASL .User = cfg .SASL .Username
180
186
saramaConfig .Net .SASL .Password = cfg .SASL .Password
187
+ if cfg .SASL .Mechanism == "sha512" {
188
+ saramaConfig .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient { return & XDGSCRAMClient {HashGeneratorFcn : SHA512 } }
189
+ saramaConfig .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA512
190
+ } else if cfg .SASL .Mechanism == "sha256" {
191
+ saramaConfig .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient { return & XDGSCRAMClient {HashGeneratorFcn : SHA256 } }
192
+ saramaConfig .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA256
193
+ } else if cfg .SASL .Mechanism == "plain" || cfg .SASL .Mechanism == "" {
194
+ saramaConfig .Net .SASL .Mechanism = sarama .SASLTypePlaintext
195
+ } else {
196
+ return nil , fmt .Errorf ("invalid scram sha mechanism: %s: can be one of 'sha256', 'sha512' or 'plain'" , cfg .SASL .Mechanism )
197
+ }
181
198
}
182
199
183
200
// TODO: Find a generic way to override all other configs
@@ -190,3 +207,32 @@ func createSaramaProducer(cfg *KafkaConfig) (sarama.SyncProducer, error) {
190
207
191
208
return producer , nil
192
209
}
210
+
211
+ var (
212
+ SHA256 scram.HashGeneratorFcn = sha256 .New
213
+ SHA512 scram.HashGeneratorFcn = sha512 .New
214
+ )
215
+
216
+ type XDGSCRAMClient struct {
217
+ * scram.Client
218
+ * scram.ClientConversation
219
+ scram.HashGeneratorFcn
220
+ }
221
+
222
+ func (x * XDGSCRAMClient ) Begin (userName , password , authzID string ) (err error ) {
223
+ x .Client , err = x .HashGeneratorFcn .NewClient (userName , password , authzID )
224
+ if err != nil {
225
+ return err
226
+ }
227
+ x .ClientConversation = x .Client .NewConversation ()
228
+ return nil
229
+ }
230
+
231
+ func (x * XDGSCRAMClient ) Step (challenge string ) (response string , err error ) {
232
+ response , err = x .ClientConversation .Step (challenge )
233
+ return
234
+ }
235
+
236
+ func (x * XDGSCRAMClient ) Done () bool {
237
+ return x .ClientConversation .Done ()
238
+ }
0 commit comments