Skip to content

Commit 82d76c0

Browse files
committed
enable sending checks to asynq
1 parent 89687fc commit 82d76c0

File tree

8 files changed

+111
-10
lines changed

8 files changed

+111
-10
lines changed

Diff for: cmd/vulcan-scan-engine/commands/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type config struct {
9191
DB dbConfig
9292
Vulcan checktypesInformer
9393
SQS queue.Config
94+
Redis queue.RedisConfig
9495
ScansSNS notify.Config `mapstructure:"scans_sns"`
9596
ChecksSNS notify.Config `mapstructure:"checks_sns"`
9697
Metrics metricsConfig

Diff for: cmd/vulcan-scan-engine/commands/root.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,13 @@ func startServer() error {
166166
)
167167

168168
streamClient := stream.NewClient(cfg.Stream.URL)
169+
redisProducer := queue.NewRedisProducer(cfg.Redis, logger)
169170

170171
producer, err := queue.NewMultiSQSProducer(cfg.CTQueues.ARNs(), cfg.SQS.Endpoint, logger)
171172
if err != nil {
172173
return err
173174
}
174-
jobsSender, err := scans.NewJobQueueSender(producer, cfg.CTQueues.Names())
175+
jobsSender, err := scans.NewJobQueueSender(producer, cfg.CTQueues.Names(), redisProducer)
175176
if err != nil {
176177
return err
177178
}

Diff for: config.toml

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ timeout = $CHECKS_SQS_TIMEOUT
2424
queue_arn = "$CHECKS_SQS_ARN"
2525
endpoint = "$AWS_SQS_ENDPOINT"
2626

27+
[redis]
28+
Host = "$REDIS_HOST"
29+
Port = $REDIS_PORT
30+
Usr = "$REDIS_USR"
31+
Pwd = "$REDIS_PWD"
32+
DB = $REDIS_DB
33+
2734
[scans_sns]
2835
topic_arn = "$SCANS_SNS_ARN"
2936
endpoint = "$AWS_SNS_ENDPOINT"

Diff for: go.mod

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module github.com/adevinta/vulcan-scan-engine
22

3-
go 1.21.0
3+
go 1.22
44

5-
toolchain go1.21.6
5+
toolchain go1.23.3
66

77
require (
88
github.com/adevinta/errors v1.0.0
@@ -14,6 +14,7 @@ require (
1414
github.com/goadesign/goa v1.4.3
1515
github.com/google/go-cmp v0.6.0
1616
github.com/gorilla/mux v1.8.1
17+
github.com/hibiken/asynq v0.25.1
1718
github.com/jinzhu/gorm v1.9.16
1819
github.com/jmoiron/sqlx v1.4.0
1920
github.com/lib/pq v1.10.9
@@ -28,12 +29,15 @@ require (
2829
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
2930
github.com/Microsoft/go-winio v0.5.2 // indirect
3031
github.com/armon/go-metrics v0.4.1 // indirect
32+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
33+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
3134
github.com/dimfeld/httptreemux v5.0.1+incompatible // indirect
3235
github.com/fsnotify/fsnotify v1.7.0 // indirect
3336
github.com/go-logfmt/logfmt v0.5.1 // indirect
3437
github.com/go-playground/locales v0.12.1 // indirect
3538
github.com/go-playground/universal-translator v0.16.0 // indirect
3639
github.com/gofrs/uuid v3.2.0+incompatible // indirect
40+
github.com/google/uuid v1.6.0 // indirect
3741
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
3842
github.com/hashicorp/golang-lru v0.5.4 // indirect
3943
github.com/hashicorp/hcl v1.0.0 // indirect
@@ -46,19 +50,23 @@ require (
4650
github.com/mattn/go-sqlite3 v2.0.2+incompatible // indirect
4751
github.com/mitchellh/mapstructure v1.5.0 // indirect
4852
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
53+
github.com/redis/go-redis/v9 v9.7.0 // indirect
54+
github.com/robfig/cron/v3 v3.0.1 // indirect
4955
github.com/sagikazarmark/locafero v0.4.0 // indirect
5056
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
5157
github.com/sourcegraph/conc v0.3.0 // indirect
5258
github.com/spf13/afero v1.11.0 // indirect
53-
github.com/spf13/cast v1.6.0 // indirect
59+
github.com/spf13/cast v1.7.0 // indirect
5460
github.com/spf13/pflag v1.0.5 // indirect
5561
github.com/subosito/gotenv v1.6.0 // indirect
5662
go.uber.org/atomic v1.9.0 // indirect
5763
go.uber.org/multierr v1.9.0 // indirect
5864
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
5965
golang.org/x/net v0.23.0 // indirect
60-
golang.org/x/sys v0.18.0 // indirect
66+
golang.org/x/sys v0.27.0 // indirect
6167
golang.org/x/text v0.14.0 // indirect
68+
golang.org/x/time v0.8.0 // indirect
69+
google.golang.org/protobuf v1.35.2 // indirect
6270
gopkg.in/fsnotify.v1 v1.4.7 // indirect
6371
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
6472
gopkg.in/ini.v1 v1.67.0 // indirect

Diff for: go.sum

+26-4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ github.com/aws/aws-sdk-go v1.55.0/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQ
2525
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
2626
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
2727
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
28+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
29+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
30+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
31+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
2832
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
33+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
34+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
2935
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
3036
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
3137
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
@@ -36,6 +42,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
3642
github.com/denisenkom/go-mssqldb v0.0.0-20181014144952-4e0d7dc8888f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
3743
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM=
3844
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
45+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
46+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
3947
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
4048
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
4149
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
@@ -78,6 +86,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
7886
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
7987
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
8088
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
89+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
90+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
8191
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
8292
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
8393
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
@@ -92,6 +102,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
92102
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
93103
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
94104
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
105+
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
106+
github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg=
95107
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
96108
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
97109
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -168,6 +180,10 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
168180
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
169181
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
170182
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
183+
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
184+
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
185+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
186+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
171187
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
172188
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
173189
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -184,8 +200,8 @@ github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9yS
184200
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
185201
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
186202
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
187-
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
188-
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
203+
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
204+
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
189205
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
190206
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
191207
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
@@ -211,6 +227,8 @@ github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSW
211227
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
212228
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
213229
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
230+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
231+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
214232
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
215233
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
216234
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -243,13 +261,17 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
243261
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
244262
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
245263
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
246-
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
247-
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
264+
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
265+
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
248266
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
249267
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
250268
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
269+
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
270+
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
251271
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
252272
google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
273+
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
274+
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
253275
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
254276
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
255277
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=

Diff for: pkg/queue/redisproducer.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2021 Adevinta
3+
*/
4+
5+
package queue
6+
7+
import (
8+
"fmt"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
13+
"github.com/hibiken/asynq"
14+
)
15+
16+
type RedisConfig struct {
17+
Host string
18+
Port int
19+
Usr string
20+
Pwd string
21+
DB int
22+
}
23+
24+
type RedisProducer struct {
25+
client *asynq.Client
26+
log log.Logger
27+
}
28+
29+
// NewSQSProducer creates a new SQSProducer that allows to send messages to
30+
// the given queueARN.
31+
func NewRedisProducer(config RedisConfig, log log.Logger) *RedisProducer {
32+
if config.Host == "" {
33+
return nil
34+
}
35+
return &RedisProducer{
36+
client: asynq.NewClient(&asynq.RedisClientOpt{
37+
Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),
38+
DB: config.DB,
39+
Username: config.Usr,
40+
Password: config.Pwd}),
41+
log: log,
42+
}
43+
}
44+
45+
// SendMessage sends a message to the producer defined queue.
46+
func (s *RedisProducer) SendMessage(queueName string, body []byte) error {
47+
task := asynq.NewTask("checks", body)
48+
m, err := s.client.Enqueue(task, asynq.Queue(queueName), asynq.Timeout(10*time.Hour), asynq.MaxRetry(3))
49+
level.Debug(s.log).Log("asynqId", m.ID, "queue", m.Queue)
50+
return err
51+
}

Diff for: pkg/scans/jobqueuesender.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package scans
77
import (
88
"encoding/json"
99
"errors"
10+
11+
"github.com/adevinta/vulcan-scan-engine/pkg/queue"
1012
)
1113

1214
const defaultQueueName = "default"
@@ -25,17 +27,20 @@ type NamedQueuesSender interface {
2527
type JobsQueueSender struct {
2628
sender NamedQueuesSender
2729
defCTQueues map[string]string
30+
redisSender *queue.RedisProducer
2831
}
2932

3033
// NewJobQueueSender creates a new JobQueueSender given the corresponder named
3134
// queues message sender and the default queue names for checktypes.
32-
func NewJobQueueSender(sender NamedQueuesSender, defaultCTQueues map[string]string) (*JobsQueueSender, error) {
35+
func NewJobQueueSender(sender NamedQueuesSender, defaultCTQueues map[string]string, redis *queue.RedisProducer) (*JobsQueueSender, error) {
3336
if _, ok := defaultCTQueues[defaultQueueName]; !ok {
3437
return nil, ErrNoDefaultQueueDefined
3538
}
39+
3640
return &JobsQueueSender{
3741
sender: sender,
3842
defCTQueues: defaultCTQueues,
43+
redisSender: redis,
3944
}, nil
4045
}
4146

@@ -55,5 +60,9 @@ func (j *JobsQueueSender) Send(queueName string, checktypeName string, job Job)
5560
if err != nil {
5661
return err
5762
}
63+
64+
if j.redisSender != nil {
65+
j.redisSender.SendMessage(checktypeName, content)
66+
}
5867
return j.sender.Send(queueName, string(content))
5968
}

Diff for: run.sh

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export CHECKS_SQS_WAIT=${CHECKS_SQS_WAIT:-20}
1313
export CHECKS_SQS_TIMEOUT=${CHECKS_SQS_TIMEOUT:-30}
1414
export CHECKS_CREATOR_CHECKPOINT=${CHECKS_CREATOR_CHECKPOINT:-100}
1515
export PERSISTENCE_CACHE=${PERSISTENCE_CACHE:-120}
16+
export REDIS_PORT=${REDIS_PORT:-6379}
17+
export REDIS_DB=${REDIS_DB:-0}
1618

1719
# Nessus section will be deprecated,
1820
# We add this for compatibility using the new dynamic method.

0 commit comments

Comments
 (0)