Skip to content

Commit 6e40bd3

Browse files
author
Daniel Jimenez
committed
Add Kafka notifier
1 parent 2f03b1a commit 6e40bd3

File tree

10 files changed

+425
-87
lines changed

10 files changed

+425
-87
lines changed

Diff for: cmd/vulnerability-db-consumer/config.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type config struct {
1919
DB dbConfig
2020
SQS sqsConfig
2121
SNS snsConfig
22+
Kafka kafkaConfig
2223
Report reportConfig
2324
Maintenance maintenanceConfig
2425
}
@@ -46,11 +47,19 @@ type sqsConfig struct {
4647
}
4748

4849
type snsConfig struct {
49-
TopicARN string `toml:"topic_arn"`
5050
Enabled bool
51+
TopicARN string `toml:"topic_arn"`
5152
Endpoint string `toml:"endpoint"`
5253
}
5354

55+
type kafkaConfig struct {
56+
Enabled bool `toml:"enabled"`
57+
User string `toml:"user"`
58+
Pass string `toml:"password"`
59+
BrokerURL string `toml:"broker_url"`
60+
Topic string `toml:"topic"`
61+
}
62+
5463
type reportConfig struct {
5564
URLReplace string `toml:"url_replace"`
5665
}

Diff for: cmd/vulnerability-db-consumer/main.go

+46-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"sync"
1313

14+
"github.com/adevinta/vulnerability-db/pkg/asyncapi/kafka"
1415
"github.com/adevinta/vulnerability-db/pkg/maintenance"
1516
"github.com/adevinta/vulnerability-db/pkg/notify"
1617
"github.com/adevinta/vulnerability-db/pkg/processor"
@@ -43,14 +44,9 @@ func main() {
4344
}
4445

4546
// Build notifier.
46-
snsConf := notify.SNSConfig{
47-
TopicArn: conf.SNS.TopicARN,
48-
Enabled: conf.SNS.Enabled,
49-
Endpoint: conf.SNS.Endpoint,
50-
}
51-
snsNotifier, err := notify.NewSNSNotifier(snsConf, logger)
47+
notifier, err := buildNotifier(conf, logger)
5248
if err != nil {
53-
log.Fatalf("Error creating notifier: %v", err)
49+
log.Fatalf("Error building notifier: %v", err)
5450
}
5551

5652
// Build processor.
@@ -59,7 +55,7 @@ func main() {
5955
log.Fatalf("Error creating results client: %v", err)
6056
}
6157

62-
processor, err := processor.NewCheckProcessor(snsNotifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
58+
processor, err := processor.NewCheckProcessor(notifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
6359
if err != nil {
6460
log.Fatalf("Error creating queue processor: %v", err)
6561
}
@@ -97,6 +93,48 @@ func main() {
9793
wg.Wait()
9894
}
9995

96+
// buildNotifier builds the appropiate notifier given the defined configuration.
97+
// TODO: Once the integrations dependent on the old notification format have been
98+
// deprecated or updated to comply with the new format through Kafka topic channel
99+
// we can get rid of SNS and multi implementations of notifier and just use Kafka.
100+
func buildNotifier(conf *config, logger *log.Logger) (notify.Notifier, error) {
101+
if !conf.SNS.Enabled && !conf.Kafka.Enabled {
102+
return notify.NewNoopNotifier(), nil
103+
}
104+
if conf.SNS.Enabled && !conf.Kafka.Enabled {
105+
return buildSNSNotifier(conf, logger)
106+
}
107+
if !conf.SNS.Enabled && conf.Kafka.Enabled {
108+
return buildKafkaNotifier(conf, logger)
109+
}
110+
// Multi Notifier
111+
k, err := buildKafkaNotifier(conf, logger)
112+
if err != nil {
113+
return nil, err
114+
}
115+
s, err := buildSNSNotifier(conf, logger)
116+
if err != nil {
117+
return nil, err
118+
}
119+
return notify.NewMultiNotifier(k, s), nil
120+
}
121+
122+
func buildSNSNotifier(conf *config, logger *log.Logger) (*notify.SNSNotifier, error) {
123+
return notify.NewSNSNotifier(notify.SNSConfig{
124+
TopicArn: conf.SNS.TopicARN,
125+
Endpoint: conf.SNS.Endpoint,
126+
}, logger)
127+
}
128+
129+
func buildKafkaNotifier(conf *config, logger *log.Logger) (*notify.KafkaNotifier, error) {
130+
kafkaCli, err := kafka.NewClient(conf.Kafka.User, conf.Kafka.Pass,
131+
conf.Kafka.BrokerURL, conf.Kafka.Topic)
132+
if err != nil {
133+
return nil, err
134+
}
135+
return notify.NewKafkaNotifier(&kafkaCli, logger), nil
136+
}
137+
100138
func setupLogger(cfg config) *log.Logger {
101139
var logger = log.New()
102140

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/BurntSushi/toml v0.4.1
77
github.com/adevinta/vulcan-report v1.0.0
88
github.com/aws/aws-sdk-go v1.44.98
9+
github.com/confluentinc/confluent-kafka-go v1.9.2
910
github.com/google/go-cmp v0.5.9
1011
github.com/jmoiron/sqlx v1.3.4
1112
github.com/lib/pq v1.10.3
@@ -14,7 +15,6 @@ require (
1415

1516
require (
1617
github.com/jmespath/go-jmespath v0.4.0 // indirect
17-
github.com/stretchr/testify v1.7.0 // indirect
1818
golang.org/x/sys v0.0.0-20220913175220-63ea55921009 // indirect
1919
gopkg.in/yaml.v2 v2.4.0 // indirect
2020
)

Diff for: go.sum

+204-2
Large diffs are not rendered by default.

Diff for: pkg/notify/kafka.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package notify
2+
3+
import (
4+
"encoding/json"
5+
6+
log "github.com/sirupsen/logrus"
7+
)
8+
9+
const (
10+
version = "v0.0.1" // Defines the notification schema version
11+
)
12+
13+
// EventStreamClient represent a client of an event stream system, like Kafka
14+
// or AWS FIFO SQS queues.
15+
type EventStreamClient interface {
16+
Push(id string, payload []byte, metadata map[string][]byte) error
17+
}
18+
19+
// KafkaNotifier represents a Notifier implementation to send notifications to
20+
// a Kafka topic.
21+
type KafkaNotifier struct {
22+
c EventStreamClient
23+
l *log.Logger
24+
}
25+
26+
// NewKafkaNotifier creates a new KafkaNotifier.
27+
func NewKafkaNotifier(c EventStreamClient, l *log.Logger) *KafkaNotifier {
28+
return &KafkaNotifier{
29+
c,
30+
l,
31+
}
32+
}
33+
34+
// PushFinding sends the given FindingNotification to the configured Kafka topic
35+
// in the wrapped EventStreamClient.
36+
func (k *KafkaNotifier) PushFinding(f FindingNotification) error {
37+
k.l.WithFields(log.Fields{
38+
"notifier": "kafka",
39+
"id": f.ID,
40+
}).Debug("pushing finding notification")
41+
42+
payload, err := json.Marshal(f)
43+
if err != nil {
44+
return err
45+
}
46+
meta := map[string][]byte{
47+
"version": []byte(version),
48+
}
49+
return k.c.Push(f.ID, payload, meta)
50+
}

Diff for: pkg/notify/multi.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package notify
2+
3+
// MultiNotifier represents a Notifier which delegates the
4+
// notification delivery into multiple notifier implementations.
5+
type MultiNotifier struct {
6+
notifiers []Notifier
7+
}
8+
9+
// NewMultiNotifier creates a new MultiNotifier.
10+
func NewMultiNotifier(notifiers ...Notifier) *MultiNotifier {
11+
return &MultiNotifier{
12+
notifiers: notifiers,
13+
}
14+
}
15+
16+
func (m *MultiNotifier) PushFinding(f FindingNotification) error {
17+
// For every notifier wrapped by the multi implementation, push
18+
// the given FindingNotification. Return an error on the first
19+
// notifier that fails to deliver the notification, so possible
20+
// repeated sending events might happen. This should be handled
21+
// by the consumers complying with at-least-once semantics.
22+
for iN := range m.notifiers {
23+
if err := m.notifiers[iN].PushFinding(f); err != nil {
24+
return err
25+
}
26+
}
27+
return nil
28+
}

Diff for: pkg/notify/noop.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package notify
2+
3+
// NoopNotifier represents a notifier which performs no action.
4+
type NoopNotifier struct{}
5+
6+
// NoopNotifier creates a new NoopNotifier.
7+
func NewNoopNotifier() *NoopNotifier {
8+
return &NoopNotifier{}
9+
}
10+
11+
func (n *NoopNotifier) PushFinding(f FindingNotification) error {
12+
return nil // Do nothing
13+
}

Diff for: pkg/notify/sns.go

+58-10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"fmt"
1010
"strings"
11+
"time"
1112

1213
"github.com/aws/aws-sdk-go/aws"
1314
"github.com/aws/aws-sdk-go/aws/session"
@@ -19,17 +20,43 @@ import (
1920
// SNSConfig holds the required SNS config information.
2021
type SNSConfig struct {
2122
TopicArn string `mapstructure:"topic_arn"`
22-
Enabled bool `mapstructure:"enabled"`
2323
Endpoint string `mapstructure:"endpoint"`
2424
}
2525

26-
// SNSNotifier sends push events to an SNS topic.
26+
// SNSNotifier sends notifications to an SNS topic.
2727
type SNSNotifier struct {
2828
conf SNSConfig
2929
sns snsiface.SNSAPI
3030
logger *log.Logger
3131
}
3232

33+
// FindingNotification is the data that's notified when a new finding occurs.
34+
// TODO: This struct has been moved from processor types so it's isolated in
35+
// the SNS notifier implementation and it's easier to deprecate in the future
36+
// in favor of Kafka implementation. This SNS implementation only exists in
37+
// order to maintain compatibility with old integrations that expects this
38+
// notification format.
39+
type findingNotification struct {
40+
TargetID string `json:"target_id"`
41+
Target string `json:"target"`
42+
IssueID string `json:"issue_id"`
43+
FindingID string `json:"finding_id"`
44+
CheckID string `json:"check_id"`
45+
ChecktypeName string `json:"checktype_name"`
46+
CheckTypeOptions string `json:"checktype_options"`
47+
Tag string `json:"tag"`
48+
Time time.Time `json:"time"`
49+
Vulnerability vulnerability `json:"vulnerability"`
50+
}
51+
52+
type vulnerability struct {
53+
ID string `json:"id"`
54+
Summary string `json:"summary"`
55+
Score float64 `json:"score"`
56+
CWEID uint32 `json:"cwe_id"`
57+
Description string `json:"description"`
58+
}
59+
3360
// NewSNSNotifier creates a new SNSNotifier with the given configuration.
3461
func NewSNSNotifier(conf SNSConfig, logger *log.Logger) (*SNSNotifier, error) {
3562
sess, err := session.NewSession()
@@ -57,25 +84,23 @@ func NewSNSNotifier(conf SNSConfig, logger *log.Logger) (*SNSNotifier, error) {
5784

5885
// PushFinding pushes a finding notification to the configured sns topic.
5986
func (n *SNSNotifier) PushFinding(f FindingNotification) error {
60-
if !n.conf.Enabled {
61-
return nil
62-
}
63-
64-
n.logger.Info("Pushing notification to SNS")
65-
content, err := json.Marshal(f)
87+
n.logger.WithFields(log.Fields{
88+
"notifier": "sns",
89+
"id": f.ID,
90+
}).Info("pushing finding notification")
91+
content, err := json.Marshal(toOldFmt(f))
6692
if err != nil {
6793
return err
6894
}
6995
input := &sns.PublishInput{
7096
Message: aws.String(string(content)),
7197
TopicArn: aws.String(n.conf.TopicArn),
7298
}
99+
73100
_, err = n.sns.Publish(input)
74101
if err != nil {
75102
return err
76103
}
77-
n.logger.Info("Notification pushed to SNS successfully")
78-
79104
return nil
80105
}
81106

@@ -96,3 +121,26 @@ func parseSNSARN(snsARN string) snsData {
96121
endpoint: fmt.Sprintf("https://sns.%v.amazonaws.com/%v/%v", region, accountID, name),
97122
}
98123
}
124+
125+
// toOldFmt translates a FindingNotification into the old
126+
// findingNotification format.
127+
func toOldFmt(f FindingNotification) findingNotification {
128+
return findingNotification{
129+
TargetID: f.Target.ID,
130+
Target: f.Target.Identifier,
131+
IssueID: f.Issue.ID,
132+
FindingID: f.ID,
133+
CheckID: f.Source.Instance,
134+
ChecktypeName: f.Source.Component,
135+
CheckTypeOptions: f.Source.Options,
136+
Tag: f.Tag,
137+
Time: f.Source.Time,
138+
Vulnerability: vulnerability{
139+
ID: f.Issue.ID,
140+
Summary: f.Issue.Summary,
141+
Score: f.Finding.Score,
142+
CWEID: f.Issue.CWEID,
143+
Description: f.Issue.Description,
144+
},
145+
}
146+
}

0 commit comments

Comments
 (0)