Skip to content

Commit 757330d

Browse files
author
Daniel Jimenez
committed
Add Kafka notifier
1 parent 2f03b1a commit 757330d

File tree

11 files changed

+522
-87
lines changed

11 files changed

+522
-87
lines changed

Diff for: cmd/vulnerability-db-consumer/config.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type config struct {
1919
DB dbConfig
2020
SQS sqsConfig
2121
SNS snsConfig
22+
Kafka kafkaConfig
2223
Report reportConfig
2324
Maintenance maintenanceConfig
2425
}
@@ -46,11 +47,19 @@ type sqsConfig struct {
4647
}
4748

4849
type snsConfig struct {
49-
TopicARN string `toml:"topic_arn"`
5050
Enabled bool
51+
TopicARN string `toml:"topic_arn"`
5152
Endpoint string `toml:"endpoint"`
5253
}
5354

55+
type kafkaConfig struct {
56+
Enabled bool `toml:"enabled"`
57+
User string `toml:"user"`
58+
Pass string `toml:"password"`
59+
BrokerURL string `toml:"broker_url"`
60+
Topic string `toml:"topic"`
61+
}
62+
5463
type reportConfig struct {
5564
URLReplace string `toml:"url_replace"`
5665
}

Diff for: cmd/vulnerability-db-consumer/main.go

+46-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"sync"
1313

14+
"github.com/adevinta/vulnerability-db/pkg/asyncapi/kafka"
1415
"github.com/adevinta/vulnerability-db/pkg/maintenance"
1516
"github.com/adevinta/vulnerability-db/pkg/notify"
1617
"github.com/adevinta/vulnerability-db/pkg/processor"
@@ -43,14 +44,9 @@ func main() {
4344
}
4445

4546
// Build notifier.
46-
snsConf := notify.SNSConfig{
47-
TopicArn: conf.SNS.TopicARN,
48-
Enabled: conf.SNS.Enabled,
49-
Endpoint: conf.SNS.Endpoint,
50-
}
51-
snsNotifier, err := notify.NewSNSNotifier(snsConf, logger)
47+
notifier, err := buildNotifier(conf, logger)
5248
if err != nil {
53-
log.Fatalf("Error creating notifier: %v", err)
49+
log.Fatalf("Error building notifier: %v", err)
5450
}
5551

5652
// Build processor.
@@ -59,7 +55,7 @@ func main() {
5955
log.Fatalf("Error creating results client: %v", err)
6056
}
6157

62-
processor, err := processor.NewCheckProcessor(snsNotifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
58+
processor, err := processor.NewCheckProcessor(notifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
6359
if err != nil {
6460
log.Fatalf("Error creating queue processor: %v", err)
6561
}
@@ -97,6 +93,48 @@ func main() {
9793
wg.Wait()
9894
}
9995

96+
// buildNotifier builds the appropiate notifier given the defined configuration.
97+
// TODO: Once the integrations dependent on the old notification format have been
98+
// deprecated or updated to comply with the new format through Kafka topic channel
99+
// we can get rid of SNS and multi implementations of notifier and just use Kafka.
100+
func buildNotifier(conf *config, logger *log.Logger) (notify.Notifier, error) {
101+
if !conf.SNS.Enabled && !conf.Kafka.Enabled {
102+
return notify.NewNoopNotifier(), nil
103+
}
104+
if conf.SNS.Enabled && !conf.Kafka.Enabled {
105+
return buildSNSNotifier(conf, logger)
106+
}
107+
if !conf.SNS.Enabled && conf.Kafka.Enabled {
108+
return buildKafkaNotifier(conf, logger)
109+
}
110+
// Multi Notifier
111+
k, err := buildKafkaNotifier(conf, logger)
112+
if err != nil {
113+
return nil, err
114+
}
115+
s, err := buildSNSNotifier(conf, logger)
116+
if err != nil {
117+
return nil, err
118+
}
119+
return notify.NewMultiNotifier(k, s), nil
120+
}
121+
122+
func buildSNSNotifier(conf *config, logger *log.Logger) (*notify.SNSNotifier, error) {
123+
return notify.NewSNSNotifier(notify.SNSConfig{
124+
TopicArn: conf.SNS.TopicARN,
125+
Endpoint: conf.SNS.Endpoint,
126+
}, logger)
127+
}
128+
129+
func buildKafkaNotifier(conf *config, logger *log.Logger) (*notify.KafkaNotifier, error) {
130+
kafkaCli, err := kafka.NewClient(conf.Kafka.User, conf.Kafka.Pass,
131+
conf.Kafka.BrokerURL, conf.Kafka.Topic)
132+
if err != nil {
133+
return nil, err
134+
}
135+
return notify.NewKafkaNotifier(&kafkaCli, logger), nil
136+
}
137+
100138
func setupLogger(cfg config) *log.Logger {
101139
var logger = log.New()
102140

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/BurntSushi/toml v0.4.1
77
github.com/adevinta/vulcan-report v1.0.0
88
github.com/aws/aws-sdk-go v1.44.98
9+
github.com/confluentinc/confluent-kafka-go v1.9.2
910
github.com/google/go-cmp v0.5.9
1011
github.com/jmoiron/sqlx v1.3.4
1112
github.com/lib/pq v1.10.3
@@ -14,7 +15,6 @@ require (
1415

1516
require (
1617
github.com/jmespath/go-jmespath v0.4.0 // indirect
17-
github.com/stretchr/testify v1.7.0 // indirect
1818
golang.org/x/sys v0.0.0-20220913175220-63ea55921009 // indirect
1919
gopkg.in/yaml.v2 v2.4.0 // indirect
2020
)

Diff for: go.sum

+204-2
Large diffs are not rendered by default.

Diff for: pkg/asyncapi/kafka/client.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2022 Adevinta
3+
*/
4+
5+
package kafka
6+
7+
import (
8+
"errors"
9+
"fmt"
10+
11+
"github.com/confluentinc/confluent-kafka-go/kafka"
12+
)
13+
14+
var (
15+
// ErrEmptyPayload is returned by the Push method of the [Client] when the
16+
// given payload is empty.
17+
ErrEmptyPayload = errors.New("payload can't be empty")
18+
)
19+
20+
const (
21+
kafkaSecurityProtocol = "sasl_ssl"
22+
kafkaSaslMechanisms = "SCRAM-SHA-256"
23+
)
24+
25+
// Client implements an EventStreamClient using Kafka as the event stream
26+
// system.
27+
type Client struct {
28+
producer *kafka.Producer
29+
topic string
30+
}
31+
32+
// NewClient creates a new Kafka client connected to the a broker using the
33+
// given credentials and setting the mapping between all the entities and their
34+
// corresponding topics.
35+
func NewClient(user string, password string, broker string, topic string) (Client, error) {
36+
config := kafka.ConfigMap{
37+
"bootstrap.servers": broker,
38+
}
39+
if password != "" {
40+
config.SetKey("security.protocol", kafkaSecurityProtocol)
41+
config.SetKey("sasl.mechanisms", kafkaSaslMechanisms)
42+
config.SetKey("sasl.username", user)
43+
config.SetKey("sasl.password", password)
44+
}
45+
p, err := kafka.NewProducer(&config)
46+
if err != nil {
47+
return Client{}, err
48+
}
49+
return Client{p, topic}, nil
50+
}
51+
52+
// Push sends the payload of an entity, with the specified id, to corresponding
53+
// topic according to the specified entity, using the kafka broker the client
54+
// is connected to. The method waits until kafka confirms the message has been
55+
// stored in the topic.
56+
func (c *Client) Push(id string, payload []byte, metadata map[string][]byte) error {
57+
delivered := make(chan kafka.Event)
58+
defer close(delivered)
59+
var headers []kafka.Header
60+
for k, v := range metadata {
61+
headers = append(headers, kafka.Header{
62+
Key: k,
63+
Value: v,
64+
})
65+
}
66+
msg := kafka.Message{
67+
TopicPartition: kafka.TopicPartition{
68+
Topic: &c.topic,
69+
Partition: kafka.PartitionAny,
70+
},
71+
Key: []byte(id),
72+
Value: []byte(payload),
73+
Headers: headers,
74+
}
75+
err := c.producer.Produce(&msg, delivered)
76+
if err != nil {
77+
return fmt.Errorf("error producing message: %w", err)
78+
}
79+
e := <-delivered
80+
m := e.(*kafka.Message)
81+
if m.TopicPartition.Error != nil {
82+
return fmt.Errorf("error delivering message: %w", m.TopicPartition.Error)
83+
}
84+
return nil
85+
}

Diff for: pkg/notify/kafka.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright 2022 Adevinta
3+
*/
4+
5+
package notify
6+
7+
import (
8+
"encoding/json"
9+
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
const (
14+
version = "v0.0.1" // Defines the notification schema version
15+
)
16+
17+
// EventStreamClient represent a client of an event stream system, like Kafka
18+
// or AWS FIFO SQS queues.
19+
type EventStreamClient interface {
20+
Push(id string, payload []byte, metadata map[string][]byte) error
21+
}
22+
23+
// KafkaNotifier represents a Notifier implementation to send notifications to
24+
// a Kafka topic.
25+
type KafkaNotifier struct {
26+
c EventStreamClient
27+
l *log.Logger
28+
}
29+
30+
// NewKafkaNotifier creates a new KafkaNotifier.
31+
func NewKafkaNotifier(c EventStreamClient, l *log.Logger) *KafkaNotifier {
32+
return &KafkaNotifier{
33+
c,
34+
l,
35+
}
36+
}
37+
38+
// PushFinding sends the given FindingNotification to the configured Kafka topic
39+
// in the wrapped EventStreamClient.
40+
func (k *KafkaNotifier) PushFinding(f FindingNotification) error {
41+
k.l.WithFields(log.Fields{
42+
"notifier": "kafka",
43+
"id": f.ID,
44+
}).Debug("pushing finding notification")
45+
46+
payload, err := json.Marshal(f)
47+
if err != nil {
48+
return err
49+
}
50+
meta := map[string][]byte{
51+
"version": []byte(version),
52+
}
53+
return k.c.Push(f.ID, payload, meta)
54+
}

Diff for: pkg/notify/multi.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright 2022 Adevinta
3+
*/
4+
5+
package notify
6+
7+
// MultiNotifier represents a Notifier which delegates the
8+
// notification delivery into multiple notifier implementations.
9+
type MultiNotifier struct {
10+
notifiers []Notifier
11+
}
12+
13+
// NewMultiNotifier creates a new MultiNotifier.
14+
func NewMultiNotifier(notifiers ...Notifier) *MultiNotifier {
15+
return &MultiNotifier{
16+
notifiers: notifiers,
17+
}
18+
}
19+
20+
func (m *MultiNotifier) PushFinding(f FindingNotification) error {
21+
// For every notifier wrapped by the multi implementation, push
22+
// the given FindingNotification. Return an error on the first
23+
// notifier that fails to deliver the notification, so possible
24+
// repeated sending events might happen. This should be handled
25+
// by the consumers complying with at-least-once semantics.
26+
for iN := range m.notifiers {
27+
if err := m.notifiers[iN].PushFinding(f); err != nil {
28+
return err
29+
}
30+
}
31+
return nil
32+
}

Diff for: pkg/notify/noop.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
Copyright 2022 Adevinta
3+
*/
4+
5+
package notify
6+
7+
// NoopNotifier represents a notifier which performs no action.
8+
type NoopNotifier struct{}
9+
10+
// NoopNotifier creates a new NoopNotifier.
11+
func NewNoopNotifier() *NoopNotifier {
12+
return &NoopNotifier{}
13+
}
14+
15+
func (n *NoopNotifier) PushFinding(f FindingNotification) error {
16+
return nil // Do nothing
17+
}

0 commit comments

Comments
 (0)