Skip to content

Refactor vulndb events #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c28398a
Refactor finding notifications
Nov 15, 2022
c80c752
Refactor test utils
Nov 16, 2022
2f03b1a
Refactor notify pkg
Nov 16, 2022
757330d
Add Kafka notifier
Nov 17, 2022
9f79899
Update config example
Nov 18, 2022
0514cb6
Avoid sending void payloads to Kafka
Nov 18, 2022
8d115f8
Include required Kafka client build tag into Dockerfile build
Nov 18, 2022
47de3c4
Remove CGO disabled flag from Travis file
Nov 18, 2022
dba6bd7
Add comment on Docker build arguments required for Kafka client on Al…
Nov 18, 2022
d58c71c
Add notify pkg tests for Kafka and Multi implementations
Nov 21, 2022
3e4dfc6
Add Kafka to processor's integration tests
Nov 21, 2022
fedec9d
Refactor CI tests set up
Nov 22, 2022
d3e8b27
Update configs and readme
Nov 22, 2022
a64e31f
Add debug log for notifier set up during start up
Nov 23, 2022
d58881c
Add gcc to Dockerfile builder stage (kafka lib req)
Nov 23, 2022
5d61955
Fix missing err handling
Nov 24, 2022
bd67b0f
Test propagation of errors on notifications delivery
Nov 24, 2022
e6bb1b4
Improve debug log
Nov 24, 2022
25600b7
Merge branch 'master' into vulndb-events
Nov 25, 2022
c26b21f
Set concurrent workers to 1
Nov 25, 2022
cd445df
Set SQS visibility TO to 5min
Nov 25, 2022
26eebcf
Fix checks processing idempotency
Nov 25, 2022
54f1577
Add integrations test for checks processing idempotency
Nov 25, 2022
7cb1abf
Ignore FIXED finding notifications for legacy SNS integrations
Nov 28, 2022
e77b0c9
Add comment on time serialization fmt
Nov 28, 2022
b0f2409
Add AsyncAPI doc and generation script
Nov 29, 2022
e105e96
Revert SQS number of processors
Nov 30, 2022
7a1bf23
Change initial SQS processing log mssg level
Dec 1, 2022
d7b10eb
Add Kafka to local deployment
Dec 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ services:
- docker
env:
global:
- CGO_ENABLED=0
- FLYWAY_VERSION=9.4.0
- INPUT_BUILDARGS=FLYWAY_VERSION=$FLYWAY_VERSION
before_install:
# Requirement for 'test-local-deployment'
- pip install --user awscli
- export PATH=$PATH:$HOME/.local/bin
before_script:
- _script/start-pg
gobuild_args: -a -tags netgo -ldflags '-w'
go_import_path: github.com/adevinta/vulnerability-db
script:
- go install ./...
- go test -v -tags integration $(go list ./... | grep -v /vendor/) ./test
- _script/test
after_success:
- bash -c 'source <(curl -s https://raw.githubusercontent.com/adevinta/vulcan-cicd/master/docker.sh)'
- cd local_deployment
Expand Down
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

FROM golang:1.19.3-alpine3.15 as builder

# Required because the dependency
# https://github.com/confluentinc/confluent-kafka-go requires the gcc compiler.
RUN apk add gcc libc-dev

WORKDIR /app

COPY go.mod .
Expand All @@ -11,7 +15,9 @@ RUN go mod download

COPY . .

RUN cd cmd/vulnerability-db-consumer/ && GOOS=linux GOARCH=amd64 go build . && cd -
# -tags musl argument is required for dependency github.com/confluentinc/confluent-kafka-go.
# see documentation: https://github.com/confluentinc/confluent-kafka-go#using-go-modules
RUN cd cmd/vulnerability-db-consumer/ && GOOS=linux GOARCH=amd64 go build -tags musl . && cd -

FROM alpine:3.16.3

Expand Down
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ cd db && source flyway-migrate.sh && cd -
vulnerability-db-consumer -c _resources/config/local.toml
```

## How to generate AsyncAPI documentation
Generated [AsyncAPI documentation](https://www.asyncapi.com/) can be found in `./docs` directory.
```
cd pkg/asyncapi/_gen && ./gen.sh && cd -
```

## How to run the Vulnerability DB in development mode

You can test the Vulnerability DB Consumer locally in your machine.
Expand Down Expand Up @@ -78,13 +84,19 @@ Those are the variables you have to use:
|PG_PORT|Database port|5432|
|PG_SSLMODE|One of these (disable,allow,prefer,require,verify-ca,verify-full)|disable|
|PG_CA_B64|A base64 encoded CA certificate||
|SQS_NUMBER_OF_PROCESSORS|Number of concurrent SQS processors|Default: 10|
|SQS_QUEUE_ARN|Checks queueu ARN|arn:aws:sqs:xxx:123456789012:yyy|
|SNS_TOPIC_ARN|ARN of topic to publish new vulnerabilities|arn:aws:sns:xxx:123456789012:yyy|
|RESULTS_URL|External vulcan-results URL|https://results.vulcan.com|
|RESULTS_INTERNAL_URL|Internal vulcan-results URL|http://vulcan-results|
|SQS_QUEUE_ARN|Checks queueu ARN|arn:aws:sqs:xxx:123456789012:yyy|
|SQS_NUMBER_OF_PROCESSORS|Number of concurrent SQS processors|Default: 10|
|AWS_SQS_ENDPOINT|Endpoint for SQS creation queue (optional)|http://custom-aws-endpoint|
|SNS_ENABLED|Enables/Disables notifications sent to SNS|false|
|SNS_TOPIC_ARN|ARN of topic to publish new vulnerabilities|arn:aws:sns:xxx:123456789012:yyy|
|AWS_SNS_ENDPOINT|Endpoint for SNS topic (optional)|http://custom-aws-endpoint|
|KAFKA_ENABLED|Enables/Disables notifications sent to Kafka|false|
|KAFKA_USER|Kafka user||
|KAFKA_PASSWORD|Kafka password||
|KAFKA_BROKER_URL|Kafka Broker URL|localhost:9092|
|KAFKA_TOPIC|Kafka topic|findings|

```bash
docker build . -t vdb
Expand Down
9 changes: 8 additions & 1 deletion _resources/config/local.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ timeout = 30
queue_arn = "arn:aws:sqs:xxx:123456789012:yyy"

[sns]
enabled = false
topic_arn = "arn:aws:sns:xxx:123456789012:yyy"
enabled = true

[kafka]
enabled = false
user = "user"
password = "password"
broker_url = "localhost:9092"
topic = "findings"

[report]
url_replace = "https://results.vulcan.example.com|http://localhost:8081"
Expand Down
11 changes: 0 additions & 11 deletions _script/start-pg

This file was deleted.

File renamed without changes.
17 changes: 13 additions & 4 deletions cmd/vulnerability-db-consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type config struct {
DB dbConfig
SQS sqsConfig
SNS snsConfig
Kafka kafkaConfig
Report reportConfig
Maintenance maintenanceConfig
}
Expand All @@ -38,19 +39,27 @@ type dbConfig struct {
}

type sqsConfig struct {
NProcessors uint8 `toml:"number_of_processors"`
WaitTime uint8 `toml:"wait_time"`
Timeout uint8
NProcessors uint `toml:"number_of_processors"`
WaitTime uint `toml:"wait_time"`
Timeout uint
QueueARN string `toml:"queue_arn"`
Endpoint string `toml:"endpoint"`
}

type snsConfig struct {
TopicARN string `toml:"topic_arn"`
Enabled bool
TopicARN string `toml:"topic_arn"`
Endpoint string `toml:"endpoint"`
}

type kafkaConfig struct {
Enabled bool `toml:"enabled"`
User string `toml:"user"`
Pass string `toml:"password"`
BrokerURL string `toml:"broker_url"`
Topic string `toml:"topic"`
}

type reportConfig struct {
URLReplace string `toml:"url_replace"`
}
Expand Down
58 changes: 50 additions & 8 deletions cmd/vulnerability-db-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"sync"

"github.com/adevinta/vulnerability-db/pkg/asyncapi/kafka"
"github.com/adevinta/vulnerability-db/pkg/maintenance"
"github.com/adevinta/vulnerability-db/pkg/notify"
"github.com/adevinta/vulnerability-db/pkg/processor"
Expand Down Expand Up @@ -43,14 +44,9 @@ func main() {
}

// Build notifier.
snsConf := notify.SNSConfig{
TopicArn: conf.SNS.TopicARN,
Enabled: conf.SNS.Enabled,
Endpoint: conf.SNS.Endpoint,
}
snsNotifier, err := notify.NewSNSNotifier(snsConf, logger)
notifier, err := buildNotifier(conf, logger)
if err != nil {
log.Fatalf("Error creating notifier: %v", err)
log.Fatalf("Error building notifier: %v", err)
}

// Build processor.
Expand All @@ -59,7 +55,7 @@ func main() {
log.Fatalf("Error creating results client: %v", err)
}

processor, err := processor.NewCheckProcessor(snsNotifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
processor, err := processor.NewCheckProcessor(notifier, db, resultsClient, conf.Report.URLReplace, conf.MaxEventAge, logger)
if err != nil {
log.Fatalf("Error creating queue processor: %v", err)
}
Expand Down Expand Up @@ -97,6 +93,52 @@ func main() {
wg.Wait()
}

// buildNotifier builds the appropiate notifier given the defined configuration.
// TODO: Once the integrations dependent on the old notification format have been
// deprecated or updated to comply with the new format through Kafka topic channel
// we can get rid of SNS and multi implementations of notifier and just use Kafka.
func buildNotifier(conf *config, logger *log.Logger) (notify.Notifier, error) {
if !conf.SNS.Enabled && !conf.Kafka.Enabled {
logger.Info("using noop notifier")
return notify.NewNoopNotifier(), nil
}
if conf.SNS.Enabled && !conf.Kafka.Enabled {
logger.Info("using SNS notifier")
return buildSNSNotifier(conf, logger)
}
if !conf.SNS.Enabled && conf.Kafka.Enabled {
logger.Info("using Kafka notifier")
return buildKafkaNotifier(conf, logger)
}
// Multi Notifier
logger.Info("using multi notifier")
k, err := buildKafkaNotifier(conf, logger)
if err != nil {
return nil, err
}
s, err := buildSNSNotifier(conf, logger)
if err != nil {
return nil, err
}
return notify.NewMultiNotifier(k, s), nil
}

func buildSNSNotifier(conf *config, logger *log.Logger) (*notify.SNSNotifier, error) {
return notify.NewSNSNotifier(notify.SNSConfig{
TopicArn: conf.SNS.TopicARN,
Endpoint: conf.SNS.Endpoint,
}, logger)
}

func buildKafkaNotifier(conf *config, logger *log.Logger) (*notify.KafkaNotifier, error) {
kafkaCli, err := kafka.NewClient(conf.Kafka.User, conf.Kafka.Pass,
conf.Kafka.BrokerURL, conf.Kafka.Topic)
if err != nil {
return nil, err
}
return notify.NewKafkaNotifier(kafkaCli, logger), nil
}

func setupLogger(cfg config) *log.Logger {
var logger = log.New()

Expand Down
11 changes: 9 additions & 2 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@ name = "$PG_NAME"
[sqs]
number_of_processors = $SQS_NUMBER_OF_PROCESSORS
wait_time = 20
timeout = 30
timeout = 60
queue_arn = "$SQS_QUEUE_ARN"
endpoint = "$AWS_SQS_ENDPOINT"

[sns]
enabled = $SNS_ENABLED
topic_arn = "$SNS_TOPIC_ARN"
enabled = true
endpoint = "$AWS_SNS_ENDPOINT"

[kafka]
enabled = $KAFKA_ENABLED
user = "$KAFKA_USER"
password = "$KAFKA_PASSWORD"
broker_url = "$KAFKA_BROKER_URL"
topic = "$KAFKA_TOPIC"

[report]
url_replace = "$RESULTS_URL|$RESULTS_INTERNAL_URL"

Expand Down
Loading