Skip to content

Commit 690c4fe

Browse files
committed
refactor
1 parent eb498c0 commit 690c4fe

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

pkg/store/findings.go

+49
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/jmoiron/sqlx"
15+
"github.com/lib/pq"
1516
log "github.com/sirupsen/logrus"
1617
)
1718

@@ -417,6 +418,54 @@ func findingsStates(ctx context.Context, tx *sqlx.Tx, log *log.Logger, s SourceF
417418
return states, nil
418419
}
419420

421+
func findingsStatesV2(ctx context.Context, tx *sqlx.Tx, log *log.Logger, source string, components []string) ([]findingState, error) {
422+
findingsQ := `
423+
WITH relevant_sources AS(
424+
SELECT *
425+
FROM sources
426+
WHERE target_id = $1 AND component = ANY($2::TEXT[])
427+
)
428+
SELECT fe.finding_id as finding_id, fe.score as score, fe.fingerprint as fingerprint,
429+
fe.affected_resource_string as affected_resource_string, fe.details as details,
430+
fe.resources as resources, s.id as source_id, s.time as source_time,
431+
f.status as status
432+
FROM relevant_sources s
433+
LEFT JOIN finding_events fe ON fe.source_id=s.id
434+
LEFT JOIN findings f ON f.id=fe.finding_id
435+
ORDER BY fe.finding_id, s.time;
436+
`
437+
438+
// The query returns all the sources of any family that are capable of detecting any issue
439+
// ever found for the target by a source belonging to the family of the source we are processing.
440+
// Along with the source data, it retrieves the finding events, if any, sorted by finding_id and time.
441+
//
442+
// The 'relevant_sources' query gets all the sources that have been executed against the specified
443+
// target which are in the list of all the sources that have ever detected an issue from the whole
444+
// list of issues ever detected by the source we are processing.
445+
//
446+
// Example of data returned:
447+
// finding_id | score | source_id | source_time
448+
// --------------------------------------+-------+--------------------------------------+---------------------
449+
// 509b7663-7467-4bf4-95df-495aadcf70ac | 3.9 | ee51f3eb-cb59-474e-b6bd-449bf08990c8 | 2019-06-08 09:32:08
450+
// 509b7663-7467-4bf4-95df-495aadcf70ac | 3.9 | 8b912b11-b66b-4b79-9acd-d1a47305162b | 2019-10-08 08:22:30
451+
// 85ccdcc8-edbb-4691-be7b-32d9dd24b696 | 3.9 | 8b912b11-b66b-4b79-9acd-d1a47305162b | 2019-10-08 08:22:30
452+
// 9ce52ab3-35ff-4c54-ad1c-e8b2b9d73e71 | 3.9 | ee51f3eb-cb59-474e-b6bd-449bf08990c8 | 2019-06-08 09:32:08
453+
// 9ce52ab3-35ff-4c54-ad1c-e8b2b9d73e71 | 3.9 | 8b912b11-b66b-4b79-9acd-d1a47305162b | 2019-10-08 08:22:30
454+
// | | 8b912b11-b66b-4b79-9acd-d1a47305162b | 2019-10-08 08:22:30
455+
// | | ee51f3eb-cb59-474e-b6bd-449bf08990c8 | 2019-06-08 09:32:08
456+
457+
args := []interface{}{source, pq.Array(components)}
458+
logQuery(log, "FindingStates", findingsQ, args...)
459+
sourceF := []sourceFindings{}
460+
err := tx.Select(&sourceF, findingsQ, args...)
461+
if err != nil && !IsNotFoundErr(err) {
462+
return nil, err
463+
}
464+
465+
states := buildFindingStates(sourceF)
466+
return states, nil
467+
}
468+
420469
func buildFindingStates(sourceF []sourceFindings) []findingState {
421470
if len(sourceF) == 0 {
422471
return nil

pkg/store/sources.go

+56-3
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,12 @@ func (db *psqlxStore) ProcessSourceExecution(s Source, sourceFindings []SourceFi
126126
// conditions from other possible concurrent sources being processed.
127127
// To do that we have to lock based on all related sources that can
128128
// have an effect on the current source being processed and its findings.
129-
relatedFamilies, err := db.relatedFamilies(s.SourceFamily)
129+
relatedComponents, err := db.relatedComponents(s.SourceFamily)
130130
if err != nil {
131131
return Source{}, err
132132
}
133-
err = db.lockTxBySources(tx, relatedFamilies)
133+
// err = db.lockTxBySources(tx, relatedFamilies)
134+
err = db.lockTxByComponent(tx, s.Target, relatedComponents)
134135
if err != nil {
135136
return Source{}, err
136137
}
@@ -166,7 +167,7 @@ func (db *psqlxStore) ProcessSourceExecution(s Source, sourceFindings []SourceFi
166167
}
167168

168169
tFindings := time.Now()
169-
findingStates, err := findingsStates(ctx, tx, db.logger, screated.SourceFamily)
170+
findingStates, err := findingsStatesV2(ctx, tx, db.logger, s.Target, relatedComponents)
170171
if err != nil {
171172
tx.Rollback()
172173
return Source{}, err
@@ -257,6 +258,31 @@ func (db *psqlxStore) relatedFamilies(sf SourceFamily) (SourceFamilies, error) {
257258
return families, nil
258259
}
259260

261+
// RelatedFamilies returns the related families for the input SourceFamily.
262+
// That is the source families that are capable of detecting at least one
263+
// of the issues that the input source has ever detected for the specified target.
264+
func (db *psqlxStore) relatedComponents(sf SourceFamily) ([]string, error) {
265+
q := `
266+
SELECT UNNEST(comps)
267+
FROM (
268+
SELECT DISTINCT ARRAY_AGG(DISTINCT s.component) comps
269+
FROM sources s INNER JOIN source_issues si ON si.source_id = s.id
270+
WHERE s.target_id = $3 AND s.name=$1
271+
GROUP BY issue_id
272+
HAVING $2 = ANY(ARRAY_AGG(DISTINCT s.component))
273+
) C
274+
`
275+
276+
args := []interface{}{sf.Name, sf.Component, sf.Target}
277+
logQuery(db.logger, "relatedFamilies", q, args...)
278+
components := []string{}
279+
err := db.DB.Select(&components, q, args...)
280+
if err != nil && !IsNotFoundErr(err) {
281+
return nil, err
282+
}
283+
return components, nil
284+
}
285+
260286
// lockTxBySources performs an exclusive advisory lock on the given
261287
// transaction genereting a lock for each specified source family.
262288
func (db *psqlxStore) lockTxBySources(tx *sqlx.Tx, sff SourceFamilies) error {
@@ -285,6 +311,33 @@ func (db *psqlxStore) lockTxBySources(tx *sqlx.Tx, sff SourceFamilies) error {
285311
return nil
286312
}
287313

314+
// lockTxBySources performs an exclusive advisory lock on the given
315+
// transaction genereting a lock for each specified source family.
316+
func (db *psqlxStore) lockTxByComponent(tx *sqlx.Tx, target string, components []string) error {
317+
// We use advisory locks and transactions to avoid race conditions
318+
// and inconsistent DB state, for instance, with a source without its
319+
// finding events, that would lead to wrong findind exposures calculation.
320+
321+
for _, sf := range components {
322+
// A lock is obtained for every source family in scope.
323+
// The key for the lock is built by concatenating the
324+
// Name, Component and Target for each source family.
325+
fkey := fmt.Sprintf("%s%s", sf, target)
326+
327+
h := fnv.New32()
328+
h.Write([]byte(fkey))
329+
k := h.Sum32()
330+
331+
_, err := tx.Exec("SELECT pg_advisory_xact_lock($1)", k)
332+
if err != nil {
333+
tx.Rollback()
334+
return err
335+
}
336+
}
337+
338+
return nil
339+
}
340+
288341
func createSource(ctx context.Context, tx *sqlx.Tx, s Source) (*Source, error) {
289342
r := tx.QueryRowxContext(ctx, `INSERT INTO sources (name, component, instance, options, time, target_id)
290343
VALUES ($1, $2, $3, $4, $5, $6) RETURNING *`, s.Name, s.Component, s.Instance, s.Options, s.Time, s.Target)

0 commit comments

Comments
 (0)