From ea27b727ce97ff10c0d929fc721104743f10327a Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Wed, 25 Dec 2019 12:20:18 +0200 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=91=8C=20feat(message:deduplication)?= =?UTF-8?q?=20beginign=20of=20work?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gbus/abstractions.go | 10 ++ gbus/builder/builder.go | 61 +++++++---- gbus/bus.go | 28 +++-- gbus/deduplicator/implementation/tx.go | 138 +++++++++++++++++++++++++ gbus/deduplicator/store.go | 14 +++ gbus/metrics/message_metrics.go | 57 +++++++++- gbus/tx/mysql/migrations.go | 20 ++++ gbus/worker.go | 83 +++++++++++++-- go.mod | 5 - go.sum | 14 +++ tests/metrics_test.go | 2 +- 11 files changed, 384 insertions(+), 48 deletions(-) create mode 100644 gbus/deduplicator/implementation/tx.go create mode 100644 gbus/deduplicator/store.go diff --git a/gbus/abstractions.go b/gbus/abstractions.go index aa3ebe6..29944e8 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -22,6 +22,14 @@ const ( REPLY Semantics = "reply" ) +type DeduplicationPolicy int + +const ( + DeduplicationPolicyNone DeduplicationPolicy = iota + DeduplicationPolicyReject + DeduplicationPolicyAck +) + //Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus type Bus interface { HandlerRegister @@ -213,6 +221,8 @@ type Builder interface { //WithLogger set custom logger instance WithLogger(logger logrus.FieldLogger) Builder + + WithDeduplicationPolicy(method DeduplicationPolicy, age time.Duration) Builder } //Invocation context for a specific processed message diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 7021326..4de72c7 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -5,31 +5,35 @@ import ( "sync" "time" + "emperror.dev/errors" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/deduplicator/implementation" "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/serialization" "github.com/wework/grabbit/gbus/tx/mysql" ) type defaultBuilder struct { - PrefetchCount uint - connStr string - purgeOnStartup bool - sagaStoreConnStr string - txnl bool - txConnStr string - txnlProvider string - workerNum uint - serializer gbus.Serializer - dlx string - defaultPolicies []gbus.MessagePolicy - confirm bool - dbPingTimeout time.Duration - usingPingTimeout bool - logger logrus.FieldLogger - busCfg gbus.BusConfiguration + PrefetchCount uint + connStr string + purgeOnStartup bool + sagaStoreConnStr string + txnl bool + txConnStr string + txnlProvider string + workerNum uint + serializer gbus.Serializer + dlx string + defaultPolicies []gbus.MessagePolicy + confirm bool + dbPingTimeout time.Duration + usingPingTimeout bool + logger logrus.FieldLogger + busCfg gbus.BusConfiguration + deduplicationPolicy gbus.DeduplicationPolicy + deduplicationRetentionAge time.Duration } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { @@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { DefaultPolicies: builder.defaultPolicies, DbPingTimeout: 3, Confirm: builder.confirm, + DeduplicationPolicy: builder.deduplicationPolicy, } var finalLogger logrus.FieldLogger @@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } + gb.Deduplicator = implementation.NewDeduplicator(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge, gb.Log()) //TODO move this into the NewSagaStore factory methods if builder.purgeOnStartup { @@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err) panic(errMsg) } + err = gb.Deduplicator.Purge() + if err != nil { + errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator") + panic(errMsg) + } } glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) glue.SetLogger(gb.Log()) @@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde return builder } +func (builder *defaultBuilder) WithDeduplicationPolicy(policy gbus.DeduplicationPolicy, age time.Duration) gbus.Builder { + builder.deduplicationPolicy = policy + builder.deduplicationRetentionAge = age + return builder +} + //New :) func New() Nu { return Nu{} @@ -218,9 +235,11 @@ type Nu struct { //Bus inits a new BusBuilder func (Nu) Bus(brokerConnStr string) gbus.Builder { return &defaultBuilder{ - busCfg: gbus.BusConfiguration{}, - PrefetchCount: 1, - connStr: brokerConnStr, - serializer: serialization.NewGobSerializer(), - defaultPolicies: make([]gbus.MessagePolicy, 0)} + busCfg: gbus.BusConfiguration{}, + PrefetchCount: 1, + connStr: brokerConnStr, + serializer: serialization.NewGobSerializer(), + defaultPolicies: make([]gbus.MessagePolicy, 0), + deduplicationPolicy: gbus.DeduplicationPolicyNone, + } } diff --git a/gbus/bus.go b/gbus/bus.go index 68f169c..d94278f 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/wework/grabbit/gbus/deduplicator" "github.com/wework/grabbit/gbus/metrics" "github.com/opentracing-contrib/go-amqp/amqptracer" @@ -56,15 +57,17 @@ type DefaultBus struct { Glue SagaGlue TxProvider TxProvider - WorkerNum uint - Serializer Serializer - DLX string - DefaultPolicies []MessagePolicy - Confirm bool - healthChan chan error - backpressure bool - DbPingTimeout time.Duration - amqpConnected bool + WorkerNum uint + Serializer Serializer + DLX string + DeduplicationPolicy DeduplicationPolicy + Deduplicator deduplicator.DeduplicatorStore + DefaultPolicies []MessagePolicy + Confirm bool + healthChan chan error + backpressure bool + DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -222,6 +225,8 @@ func (b *DefaultBus) Start() error { return startErr } + b.Deduplicator.Start() + //declare queue var q amqp.Queue if q, e = b.createServiceQueue(); e != nil { @@ -294,7 +299,10 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { registrations: b.Registrations, serializer: b.Serializer, b: b, - amqpErrors: b.amqpErrors} + amqpErrors: b.amqpErrors, + delicatePolicy: b.DeduplicationPolicy, + duplicateStore: b.Deduplicator, + } err := w.Start() if err != nil { diff --git a/gbus/deduplicator/implementation/tx.go b/gbus/deduplicator/implementation/tx.go new file mode 100644 index 0000000..2acb6e8 --- /dev/null +++ b/gbus/deduplicator/implementation/tx.go @@ -0,0 +1,138 @@ +package implementation + +import ( + "database/sql" + "time" + + "emperror.dev/errors" + "github.com/sirupsen/logrus" + + "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/deduplicator" + "github.com/wework/grabbit/gbus/tx" +) + +var _ deduplicator.DeduplicatorStore = &deduper{} + +type deduper struct { + *gbus.Glogged + svcName string + policy gbus.DeduplicationPolicy + txProvider gbus.TxProvider + age time.Duration + ticker *time.Ticker + done chan bool + tableName string +} + +func (d *deduper) Purge() (err error) { + truncateSQL := "TRUNCATE TABLE " + d.tableName + txp, err := d.txProvider.New() + if err != nil { + return err + } + defer func() { + if err != nil { + serr := txp.Rollback() + err = errors.Append(err, serr) + } + err = txp.Commit() + }() + _, err = txp.Exec(truncateSQL) + if err != nil { + return err + } + return nil +} + +func (d *deduper) Start() { + d.ticker = time.NewTicker(time.Minute) + d.done = make(chan bool) + deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?" + go func() { + for { + select { + case <-d.done: + return + case <-d.ticker.C: + oldest := time.Now().Add(-1 * d.age) + tx, err := d.txProvider.New() + if err != nil { + d.Log().WithError(err).Error("failed to acquire a tx") + continue + } + result, err := tx.Exec(deleteQuery, oldest) + if err != nil && err != sql.ErrNoRows { + d.Log().WithError(err).Error("failed executing delete query") + } + n, err := result.RowsAffected() + if err != nil { + d.Log().WithError(err).Error("failed to get count of affected rows") + } else { + d.Log().WithField("table_name", d.tableName).WithField("rows_deleted", n). + Info("successfully cleanup duplicates table") + } + } + } + }() +} + +func (d *deduper) Stop() { + d.Log().Info("shutting down deduplicator") + d.ticker.Stop() + close(d.done) +} + +// +func (d *deduper) StoreMessageId(tx *sql.Tx, id string) error { + insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)" + _, err := tx.Exec(insertSQL, id) + if err != nil { + d.Log().WithError(err).Error("failed to insert the id of the message into the dedup table") + return err + } + return nil +} + +// MessageExists checks if a message id is in the deduplication table and returns an error if it fails +func (d *deduper) MessageExists(id string) (bool, error) { + if d.policy == gbus.DeduplicationPolicyNone { + return false, nil + } + tx, err := d.txProvider.New() + if err != nil { + return true, err + } + defer func() { + err = tx.Rollback() + if err != nil { + d.Log().WithError(err).Error("could not commit tx for query MessageExists") + } + }() + selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)" + + var exists bool + err = tx.QueryRow(selectSQL, id).Scan(&exists) + if err != nil && err == sql.ErrNoRows { + return false, nil + } + + if err != nil { + return true, err + } + + return true, nil +} + +func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.DeduplicatorStore { + d := &deduper{ + svcName: svcName, + policy: policy, + txProvider: txProvider, + age: age, + tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"), + } + l := logger.WithField("grabbit", "deduplicator") + d.SetLogger(l) + return d +} diff --git a/gbus/deduplicator/store.go b/gbus/deduplicator/store.go new file mode 100644 index 0000000..10c0860 --- /dev/null +++ b/gbus/deduplicator/store.go @@ -0,0 +1,14 @@ +package deduplicator + +import ( + "database/sql" +) + +// DeduplicatorStore abtracts the way deduplicateor manages the +type DeduplicatorStore interface { + StoreMessageId(tx *sql.Tx, id string) error + MessageExists(id string) (bool, error) + Purge() error + Start() + Stop() +} diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go index 9458b47..8679a63 100644 --- a/gbus/metrics/message_metrics.go +++ b/gbus/metrics/message_metrics.go @@ -7,7 +7,9 @@ import ( ) var ( - rejectedMessages = newRejectedMessagesCounter() + rejectedMessages = newRejectedMessagesCounter() + duplicateMessageAck = newDuplicateMessageCounter("ack") + duplicateMessageReject = newDuplicateMessageCounter("reject") ) //ResetRejectedMessagesCounter resets the counter intended to be used in tests only @@ -35,7 +37,6 @@ func GetRejectedMessagesValue() (float64, error) { } func newRejectedMessagesCounter() prometheus.Counter { - return promauto.NewCounter(prometheus.CounterOpts{ Namespace: grabbitPrefix, Subsystem: "messages", @@ -43,3 +44,55 @@ func newRejectedMessagesCounter() prometheus.Counter { Help: "counting the rejected messages", }) } + +// ResetDuplicateMessagesCounters resets the counter intended to be used in tests only +func ResetDuplicateMessagesCounters() { + prometheus.Unregister(duplicateMessageAck) + duplicateMessageAck = newDuplicateMessageCounter("ack") + prometheus.Unregister(duplicateMessageReject) + duplicateMessageReject = newDuplicateMessageCounter("reject") +} + +// ReportDuplicateMessageAck reports a message is a duplicate and Acked +func ReportDuplicateMessageAck() { + duplicateMessageAck.Inc() +} + +// ReportDuplicateMessageReject reports a message is a duplicate and Rejected +func ReportDuplicateMessageReject() { + duplicateMessageReject.Inc() +} + +// GetDuplicateMessageRejectValue gets the value of the rejected message counter +func GetDuplicateMessageRejectValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := duplicateMessageReject.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +// GetDuplicateMessageAckValue gets the value of the rejected message counter +func GetDuplicateMessageAckValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := duplicateMessageAck.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +func newDuplicateMessageCounter(rabbitAnswer string) prometheus.Counter { + return prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: "messages", + Name: "duplicate_messages", + Help: "counting the duplicate messages", + ConstLabels: prometheus.Labels{"type": rabbitAnswer}, + }) +} diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go index 2ae78ae..2092a9c 100644 --- a/gbus/tx/mysql/migrations.go +++ b/gbus/tx/mysql/migrations.go @@ -175,6 +175,7 @@ func EnsureSchema(db *sql.DB, svcName string) { sagaStoreAddSagaCreatorDetails(svcName), sagaStoreAddRPCIDDetails(svcName), sagaStoreAddCreatedAtDetails(svcName), + createDeduplicationTable(svcName), )) if err != nil { panic(err) @@ -185,6 +186,25 @@ func EnsureSchema(db *sql.DB, svcName string) { } } +func createDeduplicationTable(svcName string) *migrator.Migration { + tblName := tx.GrabbitTableNameTemplate(svcName, "duplicates") + + createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + id VARCHAR(256) NOT NULL PRIMARY KEY, + created_at timestamp DEFAULT CURRENT_TIMESTAMP, + INDEX ` + tblName + `_created_at_idx (created_at))` + + return &migrator.Migration{ + Name: "create a table to manage duplicate messages", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(createTableQuery); err != nil { + return err + } + return nil + }, + } +} + func sanitizeSvcName(svcName string) string { sanitized := tx.SanitizeTableName(svcName) diff --git a/gbus/worker.go b/gbus/worker.go index 093a7bd..4eab27f 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/wework/grabbit/gbus/deduplicator" "github.com/wework/grabbit/gbus/metrics" "emperror.dev/errors" @@ -45,6 +46,8 @@ type worker struct { amqpErrors chan *amqp.Error stop chan bool span opentracing.Span + duplicateStore deduplicator.DeduplicatorStore + delicatePolicy DeduplicationPolicy } func (worker *worker) Start() error { @@ -238,7 +241,7 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery, msgSpecifi return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), fmt.Sprintf("deadletter_%s", delivery.Type), worker.log()) } - err := worker.withTx(txWrapper, msgSpecificLogEntry) + err := worker.withTx(worker.withDeduplicator(txWrapper, &delivery, msgSpecificLogEntry)) if err != nil { //we reject the deelivery but requeue it so the message will not be lost and recovered to the dlq _ = worker.reject(true, delivery, msgSpecificLogEntry) @@ -271,7 +274,7 @@ func (worker *worker) runGlobalHandler(delivery *amqp.Delivery, msgSpecificLogEn return worker.globalRawHandler(tx, delivery) } //run the global handler inside a transactions - return worker.withTx(txWrapper, msgSpecificLogEntry) + return worker.withTx(worker.withDeduplicator(txWrapper, delivery, msgSpecificLogEntry)) } //run the global handler with metrics return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, delivery.Type, worker.log()) @@ -304,8 +307,20 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { }() msgSpecificLogEntry.Info("GOT MSG") + isDuplicate, err := worker.handleDuplicates(delivery, msgSpecificLogEntry) + if err != nil { + worker.span.LogFields(slog.Error(err)) + msgSpecificLogEntry.WithError(err).Info("failed getting information about message duplication") + _ = worker.reject(true, delivery, msgSpecificLogEntry) + return + } + if isDuplicate { + msgSpecificLogEntry.Warn("message is a duplicate") + worker.span.LogFields(slog.String("grabbit", "message is a duplicate")) + return + } - //handle a message that originated from a deadletter exchange + // handle a message that originated from a deadletter exchange if worker.isDead(delivery) && worker.deadletterHandler != nil { worker.span.LogFields(slog.Error(errors.New("handling dead-letter delivery"))) msgSpecificLogEntry.Info("invoking deadletter handler") @@ -314,18 +329,16 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } if err := worker.runGlobalHandler(&delivery, msgSpecificLogEntry); err != nil { - //when the global handler fails terminate executation and reject the message + // when the global handler fails terminate executation and reject the message _ = worker.reject(false, delivery, msgSpecificLogEntry) return } - //TODO:Dedup message - handlers := worker.resolveHandlers(isRPCreply, delivery, msgSpecificLogEntry) if len(handlers) == 0 { msgSpecificLogEntry.Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) - //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue + // remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue _ = worker.ack(delivery, msgSpecificLogEntry) return } @@ -346,7 +359,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { bm, err := worker.extractBusMessage(delivery, msgSpecificLogEntry) if err != nil { worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) - //reject poison message + // reject poison message _ = worker.reject(false, delivery, msgSpecificLogEntry) return } @@ -453,7 +466,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return handlerErr } - err := worker.withTx(handlerWrapper, msgSpecificLogEntry) + err := worker.withTx(worker.withDeduplicator(handlerWrapper, delivery, msgSpecificLogEntry)) if err != nil { return err } @@ -473,6 +486,58 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } +func (worker *worker) handleDuplicates(delivery amqp.Delivery, msgSpecificLogEntry *logrus.Entry) (bool, error) { + if worker.delicatePolicy == DeduplicationPolicyNone { + return false, nil + } + duplicate, err := worker.duplicateStore.MessageExists(delivery.MessageId) + if err != nil { + worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) + worker.log().WithError(err).Error("failed checking for existing message") + return true, err + } + if duplicate { + msgSpecificLogEntry.Error("message is a duplicate") + err = worker.duplicatePolicyApply(false, delivery, msgSpecificLogEntry) + if err != nil { + msgSpecificLogEntry.WithError(err).Error("failed handling duplicate delivery") + worker.span.LogFields(slog.Error(errors.New("failed handling duplicate delivery"))) + } + return true, err + } + return false, nil +} + +func (worker *worker) duplicatePolicyApply(requeue bool, delivery amqp.Delivery, msgSpecificLogEntry logrus.FieldLogger) error { + switch worker.delicatePolicy { + case DeduplicationPolicyReject: + msgSpecificLogEntry.Info("rejecting message on duplicate") + worker.span.LogFields(slog.String("grabbit", "rejecting duplicate")) + metrics.ReportDuplicateMessageReject() + return worker.reject(false, delivery, msgSpecificLogEntry) + case DeduplicationPolicyAck: + msgSpecificLogEntry.Info("acknowledging duplicate") + worker.span.LogFields(slog.String("grabbit", "acknowledging duplicate")) + metrics.ReportDuplicateMessageAck() + return worker.ack(delivery, msgSpecificLogEntry) + default: + return errors.NewWithDetails("invalid deduplication policy", "policy", worker.b.DeduplicationPolicy) + } +} + +func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, delivery *amqp.Delivery, logger logrus.FieldLogger) (func(tx *sql.Tx) error, logrus.FieldLogger) { + return func(tx *sql.Tx) error { + if worker.delicatePolicy == DeduplicationPolicyNone { + return txWrapper(tx) + } + err := worker.duplicateStore.StoreMessageId(tx, delivery.MessageId) + if err != nil { + return err + } + return txWrapper(tx) + }, logger +} + func (worker *worker) log() logrus.FieldLogger { return worker.b.Log().WithField("worker", worker.consumerTag) } diff --git a/go.mod b/go.mod index ff5a3f4..ac661d2 100644 --- a/go.mod +++ b/go.mod @@ -13,11 +13,8 @@ require ( github.com/jcmturner/gofork v1.0.0 // indirect github.com/klauspost/compress v1.8.6 // indirect github.com/klauspost/cpuid v1.2.1 // indirect - github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/lopezator/migrator v0.2.0 - github.com/onsi/ginkgo v1.8.0 // indirect - github.com/onsi/gomega v1.5.0 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.3.0+incompatible // indirect @@ -34,9 +31,7 @@ require ( golang.org/x/net v0.0.0-20191009170851-d66e71096ffb // indirect golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f // indirect google.golang.org/appengine v1.6.5 // indirect - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect - gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index f73ddfa..8b9aaca 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,10 @@ emperror.dev/errors v0.4.3 h1:yfhVxX1vzHgCDXh0KL+gVKfKhXlJCabmc79jS6QQuus= emperror.dev/errors v0.4.3/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= emperror.dev/handler/logrus v0.1.0 h1:hiBAmANpRKU4/RxARem/P5r0c8fa1Hx/BqX0mpplflg= emperror.dev/handler/logrus v0.1.0/go.mod h1:aulFiWRpZEHT8+r2DUgd/IniD/dyNS9Quxn//jU5aYA= +github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= +github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ= github.com/Shopify/sarama v1.24.0 h1:99vo5VAgQybHwZwiOy/RX/S3i0somjGxur3pLeheqzI= github.com/Shopify/sarama v1.24.0/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= @@ -49,6 +51,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= @@ -76,6 +79,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= @@ -100,6 +104,7 @@ github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:Oht github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.3.0+incompatible h1:CZzRn4Ut9GbUkHlQ7jqBXeZQV41ZSKWFc302ZU6lUTk= github.com/pierrec/lz4 v2.3.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -128,11 +133,15 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3 h1:yxIAU7hTyjCJeX5B/ydcwre0LcPTaHge5DU2pqND+PA= +github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3/go.mod h1:enxXXxSAurM+2jWHMDQYCUS9Tjpjs1J2C4xbZcBo8IU= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/streadway/amqp v0.0.0-20190312223743-14f78b41ce6d/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -149,12 +158,14 @@ go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc h1:c0o/qxkaO2LF5t6fQrT4b5hzyggAkLLlCUjqfRxd8Q4= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190322120337-addf6b3196f6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -163,10 +174,12 @@ golang.org/x/net v0.0.0-20191009170851-d66e71096ffb/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -178,6 +191,7 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/tests/metrics_test.go b/tests/metrics_test.go index 9d1b10e..d858ea5 100644 --- a/tests/metrics_test.go +++ b/tests/metrics_test.go @@ -130,7 +130,7 @@ func TestRunHandlerWithMetric_Latency(t *testing.T) { } } -func checkLatency(t *testing.T, sc *uint64, expected uint64, err error) { +func checkLatency(t *testing.T, sc *uint64, expected uint64, err error) { if err != nil { t.Errorf("Failed to get latency value: %e", err) } From 4a79cabd9418c7322cb5f6097e2e40c4d4b54ba8 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Wed, 25 Dec 2019 13:19:14 +0200 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=90=9B=20fix(linting:issues)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gbus/bus.go | 2 +- gbus/deduplicator/implementation/tx.go | 6 +++--- gbus/deduplicator/store.go | 6 +++--- gbus/worker.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index d94278f..32c7eb7 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -61,7 +61,7 @@ type DefaultBus struct { Serializer Serializer DLX string DeduplicationPolicy DeduplicationPolicy - Deduplicator deduplicator.DeduplicatorStore + Deduplicator deduplicator.Store DefaultPolicies []MessagePolicy Confirm bool healthChan chan error diff --git a/gbus/deduplicator/implementation/tx.go b/gbus/deduplicator/implementation/tx.go index 2acb6e8..9643bb3 100644 --- a/gbus/deduplicator/implementation/tx.go +++ b/gbus/deduplicator/implementation/tx.go @@ -12,7 +12,7 @@ import ( "github.com/wework/grabbit/gbus/tx" ) -var _ deduplicator.DeduplicatorStore = &deduper{} +var _ deduplicator.Store = &deduper{} type deduper struct { *gbus.Glogged @@ -84,7 +84,7 @@ func (d *deduper) Stop() { } // -func (d *deduper) StoreMessageId(tx *sql.Tx, id string) error { +func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error { insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)" _, err := tx.Exec(insertSQL, id) if err != nil { @@ -124,7 +124,7 @@ func (d *deduper) MessageExists(id string) (bool, error) { return true, nil } -func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.DeduplicatorStore { +func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.Store { d := &deduper{ svcName: svcName, policy: policy, diff --git a/gbus/deduplicator/store.go b/gbus/deduplicator/store.go index 10c0860..efd6769 100644 --- a/gbus/deduplicator/store.go +++ b/gbus/deduplicator/store.go @@ -4,9 +4,9 @@ import ( "database/sql" ) -// DeduplicatorStore abtracts the way deduplicateor manages the -type DeduplicatorStore interface { - StoreMessageId(tx *sql.Tx, id string) error +// Store abstracts the way deduplicateor manages the +type Store interface { + StoreMessageID(tx *sql.Tx, id string) error MessageExists(id string) (bool, error) Purge() error Start() diff --git a/gbus/worker.go b/gbus/worker.go index 4eab27f..761f3ff 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -46,7 +46,7 @@ type worker struct { amqpErrors chan *amqp.Error stop chan bool span opentracing.Span - duplicateStore deduplicator.DeduplicatorStore + duplicateStore deduplicator.Store delicatePolicy DeduplicationPolicy } @@ -530,7 +530,7 @@ func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, deliver if worker.delicatePolicy == DeduplicationPolicyNone { return txWrapper(tx) } - err := worker.duplicateStore.StoreMessageId(tx, delivery.MessageId) + err := worker.duplicateStore.StoreMessageID(tx, delivery.MessageId) if err != nil { return err } From 979554bad3a6c8efc884324c00e945ace2a9ee9c Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Wed, 25 Dec 2019 14:20:40 +0200 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=90=9B=20fix(tests:deduplication)=20n?= =?UTF-8?q?ow=20checking=20both=20acked=20and=20rejects?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gbus/deduplicator/implementation/tx.go | 2 +- gbus/worker.go | 39 ++++--- tests/bus_test.go | 136 +++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 21 deletions(-) diff --git a/gbus/deduplicator/implementation/tx.go b/gbus/deduplicator/implementation/tx.go index 9643bb3..35eb66a 100644 --- a/gbus/deduplicator/implementation/tx.go +++ b/gbus/deduplicator/implementation/tx.go @@ -121,7 +121,7 @@ func (d *deduper) MessageExists(id string) (bool, error) { return true, err } - return true, nil + return exists, nil } func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.Store { diff --git a/gbus/worker.go b/gbus/worker.go index 761f3ff..d892416 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -241,7 +241,7 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery, msgSpecifi return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), fmt.Sprintf("deadletter_%s", delivery.Type), worker.log()) } - err := worker.withTx(worker.withDeduplicator(txWrapper, &delivery, msgSpecificLogEntry)) + err := worker.withTx(txWrapper, msgSpecificLogEntry) if err != nil { //we reject the deelivery but requeue it so the message will not be lost and recovered to the dlq _ = worker.reject(true, delivery, msgSpecificLogEntry) @@ -274,7 +274,7 @@ func (worker *worker) runGlobalHandler(delivery *amqp.Delivery, msgSpecificLogEn return worker.globalRawHandler(tx, delivery) } //run the global handler inside a transactions - return worker.withTx(worker.withDeduplicator(txWrapper, delivery, msgSpecificLogEntry)) + return worker.withTx(txWrapper, msgSpecificLogEntry) } //run the global handler with metrics return metrics.RunHandlerWithMetric(metricsWrapper, handlerName, delivery.Type, worker.log()) @@ -307,18 +307,6 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { }() msgSpecificLogEntry.Info("GOT MSG") - isDuplicate, err := worker.handleDuplicates(delivery, msgSpecificLogEntry) - if err != nil { - worker.span.LogFields(slog.Error(err)) - msgSpecificLogEntry.WithError(err).Info("failed getting information about message duplication") - _ = worker.reject(true, delivery, msgSpecificLogEntry) - return - } - if isDuplicate { - msgSpecificLogEntry.Warn("message is a duplicate") - worker.span.LogFields(slog.String("grabbit", "message is a duplicate")) - return - } // handle a message that originated from a deadletter exchange if worker.isDead(delivery) && worker.deadletterHandler != nil { @@ -363,7 +351,18 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { _ = worker.reject(false, delivery, msgSpecificLogEntry) return } - + isDuplicate, err := worker.handleDuplicates(bm, delivery, msgSpecificLogEntry) + if err != nil { + worker.span.LogFields(slog.Error(err)) + msgSpecificLogEntry.WithError(err).Info("failed getting information about message duplication") + _ = worker.reject(true, delivery, msgSpecificLogEntry) + return + } + if isDuplicate { + msgSpecificLogEntry.Warn("message is a duplicate") + worker.span.LogFields(slog.String("grabbit", "message is a duplicate")) + return + } err = worker.invokeHandlers(ctx, handlers, bm, &delivery, msgSpecificLogEntry) if err == nil { _ = worker.ack(delivery, msgSpecificLogEntry) @@ -466,7 +465,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return handlerErr } - err := worker.withTx(worker.withDeduplicator(handlerWrapper, delivery, msgSpecificLogEntry)) + err := worker.withTx(worker.withDeduplicator(handlerWrapper, message, msgSpecificLogEntry)) if err != nil { return err } @@ -486,11 +485,11 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } -func (worker *worker) handleDuplicates(delivery amqp.Delivery, msgSpecificLogEntry *logrus.Entry) (bool, error) { +func (worker *worker) handleDuplicates(message *BusMessage, delivery amqp.Delivery, msgSpecificLogEntry *logrus.Entry) (bool, error) { if worker.delicatePolicy == DeduplicationPolicyNone { return false, nil } - duplicate, err := worker.duplicateStore.MessageExists(delivery.MessageId) + duplicate, err := worker.duplicateStore.MessageExists(message.IdempotencyKey) if err != nil { worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) worker.log().WithError(err).Error("failed checking for existing message") @@ -525,12 +524,12 @@ func (worker *worker) duplicatePolicyApply(requeue bool, delivery amqp.Delivery, } } -func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, delivery *amqp.Delivery, logger logrus.FieldLogger) (func(tx *sql.Tx) error, logrus.FieldLogger) { +func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, message *BusMessage, logger logrus.FieldLogger) (func(tx *sql.Tx) error, logrus.FieldLogger) { return func(tx *sql.Tx) error { if worker.delicatePolicy == DeduplicationPolicyNone { return txWrapper(tx) } - err := worker.duplicateStore.StoreMessageID(tx, delivery.MessageId) + err := worker.duplicateStore.StoreMessageID(tx, message.IdempotencyKey) if err != nil { return err } diff --git a/tests/bus_test.go b/tests/bus_test.go index ded3a78..f3a624a 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -854,3 +854,139 @@ func proceedOrTimeout(timeout time.Duration, p chan bool, onProceed func(), t *t t.Fatal("timeout") } } + +func TestDeduplicationReject(t *testing.T) { + metrics.ResetDuplicateMessagesCounters() + cmd := Command1{ + Data: "Command1", + } + proceed := make(chan bool) + b := createBusWithConfig(testSvc1, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}, func(builder gbus.Builder) { + builder.WithDeduplicationPolicy(gbus.DeduplicationPolicyReject, 1*time.Hour) + }) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, ok := message.Payload.(*Command1) + if !ok { + t.Errorf("handler invoced with wrong message type\r\nexpeted:%v\r\nactual:%v", reflect.TypeOf(Command1{}), reflect.TypeOf(message.Payload)) + } + proceed <- true + + return nil + } + + err := b.HandleMessage(cmd, handler) + if err != nil { + t.Errorf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer assertBusShutdown(b, t) + + bm := gbus.NewBusMessage(cmd) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + proceedOrTimeout(2, proceed, nil, t) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + done := make(chan bool) + ticker := time.NewTicker(500 * time.Millisecond) + rejected := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + rejectedValue, err := metrics.GetDuplicateMessageRejectValue() + if err != nil { + return + } + if rejectedValue == 1 { + rejected <- true + } + } + } + + }() + proceedOrTimeout(2, rejected, nil, t) + done <- true +} + +func TestDeduplicationAcked(t *testing.T) { + metrics.ResetDuplicateMessagesCounters() + cmd := Command1{ + Data: "Command1", + } + proceed := make(chan bool) + b := createBusWithConfig(testSvc1, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}, func(builder gbus.Builder) { + builder.WithDeduplicationPolicy(gbus.DeduplicationPolicyAck, 1*time.Hour) + }) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, ok := message.Payload.(*Command1) + if !ok { + t.Errorf("handler invoced with wrong message type\r\nexpeted:%v\r\nactual:%v", reflect.TypeOf(Command1{}), reflect.TypeOf(message.Payload)) + } + proceed <- true + + return nil + } + + err := b.HandleMessage(cmd, handler) + if err != nil { + t.Errorf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer assertBusShutdown(b, t) + + bm := gbus.NewBusMessage(cmd) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + proceedOrTimeout(2, proceed, nil, t) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + done := make(chan bool) + ticker := time.NewTicker(500 * time.Millisecond) + acked := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + ackedValue, err := metrics.GetDuplicateMessageAckValue() + if err != nil { + return + } + if ackedValue == 1 { + acked <- true + } + } + } + + }() + proceedOrTimeout(2, acked, nil, t) + done <- true +} From c2d8b8c5ecf9c89113b786fbabe92fd7daf93ed7 Mon Sep 17 00:00:00 2001 From: Vladislav Shub <1196390+vladshub@users.noreply.github.com> Date: Mon, 30 Dec 2019 12:58:30 +0200 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=90=9B=20fix(codereview:changes)=20sm?= =?UTF-8?q?all=20things=20like=20nameing=20and=20the=20correct=20way=20to?= =?UTF-8?q?=20close=20the=20deduplicator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gbus/abstractions.go | 14 +++ gbus/builder/builder.go | 6 +- gbus/bus.go | 39 +++--- gbus/deduplicator/deduper.go | 162 +++++++++++++++++++++++++ gbus/deduplicator/implementation/tx.go | 138 --------------------- gbus/deduplicator/store.go | 14 --- gbus/worker.go | 84 +++++++------ go.mod | 4 + go.sum | 34 ++---- 9 files changed, 259 insertions(+), 236 deletions(-) create mode 100644 gbus/deduplicator/deduper.go delete mode 100644 gbus/deduplicator/implementation/tx.go delete mode 100644 gbus/deduplicator/store.go diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 29944e8..5a81b31 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -294,3 +294,17 @@ type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger } + +// Deduplicator abstracts the way to manages the duplications +type Deduplicator interface { + // StoreMessageID stores the message id in the storage + StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error + // MessageIDExists checks if message exists in storage + MessageIDExists(logger logrus.FieldLogger, id string) (bool, error) + // Deletes all data from the storage of the duplicator + Purge(logger logrus.FieldLogger) error + // Starts the background process which cleans the storage of the duplicator + Start(logger logrus.FieldLogger) + // Stops the background process of cleaning + Stop(logger logrus.FieldLogger) +} diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 4de72c7..64b0aa4 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -9,7 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" - "github.com/wework/grabbit/gbus/deduplicator/implementation" + "github.com/wework/grabbit/gbus/deduplicator" "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/serialization" "github.com/wework/grabbit/gbus/tx/mysql" @@ -112,7 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } - gb.Deduplicator = implementation.NewDeduplicator(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge, gb.Log()) + gb.Deduplicator = deduplicator.New(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge) //TODO move this into the NewSagaStore factory methods if builder.purgeOnStartup { @@ -121,7 +121,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err) panic(errMsg) } - err = gb.Deduplicator.Purge() + err = gb.Deduplicator.Purge(gb.Log()) if err != nil { errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator") panic(errMsg) diff --git a/gbus/bus.go b/gbus/bus.go index 32c7eb7..a43621e 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/wework/grabbit/gbus/deduplicator" "github.com/wework/grabbit/gbus/metrics" "github.com/opentracing-contrib/go-amqp/amqptracer" @@ -61,7 +60,7 @@ type DefaultBus struct { Serializer Serializer DLX string DeduplicationPolicy DeduplicationPolicy - Deduplicator deduplicator.Store + Deduplicator Deduplicator DefaultPolicies []MessagePolicy Confirm bool healthChan chan error @@ -225,7 +224,7 @@ func (b *DefaultBus) Start() error { return startErr } - b.Deduplicator.Start() + b.Deduplicator.Start(b.Log()) //declare queue var q amqp.Queue @@ -285,23 +284,23 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i) w := &worker{ - consumerTag: tag, - channel: amqpChan, - q: b.serviceQueue, - rpcq: b.rpcQueue, - svcName: b.SvcName, - txProvider: b.TxProvider, - rpcLock: b.RPCLock, - rpcHandlers: b.RPCHandlers, - deadletterHandler: b.deadletterHandler, - globalRawHandler: b.globalRawHandler, - handlersLock: &sync.Mutex{}, - registrations: b.Registrations, - serializer: b.Serializer, - b: b, - amqpErrors: b.amqpErrors, - delicatePolicy: b.DeduplicationPolicy, - duplicateStore: b.Deduplicator, + consumerTag: tag, + channel: amqpChan, + q: b.serviceQueue, + rpcq: b.rpcQueue, + svcName: b.SvcName, + txProvider: b.TxProvider, + rpcLock: b.RPCLock, + rpcHandlers: b.RPCHandlers, + deadletterHandler: b.deadletterHandler, + globalRawHandler: b.globalRawHandler, + handlersLock: &sync.Mutex{}, + registrations: b.Registrations, + serializer: b.Serializer, + b: b, + amqpErrors: b.amqpErrors, + deduplicationPolicy: b.DeduplicationPolicy, + deduplicator: b.Deduplicator, } err := w.Start() diff --git a/gbus/deduplicator/deduper.go b/gbus/deduplicator/deduper.go new file mode 100644 index 0000000..a4aca2c --- /dev/null +++ b/gbus/deduplicator/deduper.go @@ -0,0 +1,162 @@ +package deduplicator + +import ( + "database/sql" + "sync" + "time" + + "emperror.dev/errors" + "github.com/sirupsen/logrus" + + "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/tx" +) + +var _ gbus.Deduplicator = &dedup{} + +type dedup struct { + svcName string + policy gbus.DeduplicationPolicy + txProvider gbus.TxProvider + age time.Duration + ticker *time.Ticker + done chan bool + tableName string + started bool + startStopMutex sync.Mutex +} + +func (d *dedup) Purge(logger logrus.FieldLogger) (err error) { + truncateSQL := "TRUNCATE TABLE " + d.tableName + txp, err := d.txProvider.New() + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed purging duplicates table") + return err + } + defer func() { + if err != nil { + serr := txp.Rollback() + logger.WithError(serr).Error("failed rolling back transaction after purge") + err = errors.Append(err, serr) + } + err = txp.Commit() + }() + _, err = txp.Exec(truncateSQL) + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing truncate on table") + return err + } + logger.WithField("table_name", d.tableName).Info("successfully truncated table") + return nil +} + +func (d *dedup) Start(l logrus.FieldLogger) { + d.startStopMutex.Lock() + defer d.startStopMutex.Unlock() + logger := d.decoratedLog(l) + d.ticker = time.NewTicker(time.Minute) + d.done = make(chan bool) + deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?" + go func() { + for { + select { + case <-d.done: + return + case <-d.ticker.C: + oldest := time.Now().Add(-1 * d.age) + tx, err := d.txProvider.New() + if err != nil { + logger.WithError(err).Error("failed to acquire a tx") + continue + } + result, err := tx.Exec(deleteQuery, oldest) + if err != nil && err != sql.ErrNoRows { + logger.WithError(err).Error("failed executing delete query") + continue + } + n, err := result.RowsAffected() + if err != nil { + logger.WithError(err).Error("failed to get count of affected rows") + } else { + logger.WithField("table_name", d.tableName).WithField("rows_deleted", n). + Info("successfully cleanup duplicates table") + } + } + } + }() + d.started = true +} + +func (d *dedup) decoratedLog(l logrus.FieldLogger) logrus.FieldLogger { + logger := l.WithField("grabbit", "dedup") + return logger +} + +func (d *dedup) Stop(logger logrus.FieldLogger) { + d.decoratedLog(logger).Info("shutting down deduplicator") + d.startStopMutex.Lock() + defer d.startStopMutex.Unlock() + if d.started { + d.ticker.Stop() + close(d.done) + d.started = false + } +} + +// +func (d *dedup) StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error { + insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)" + _, err := tx.Exec(insertSQL, id) + if err != nil { + d.decoratedLog(logger).WithError(err).Error("failed to insert the id of the message into the dedup table") + return err + } + return nil +} + +// MessageIDExists checks if a message id is in the deduplication table and returns an error if it fails +func (d *dedup) MessageIDExists(l logrus.FieldLogger, id string) (bool, error) { + logger := d.decoratedLog(l) + if d.policy == gbus.DeduplicationPolicyNone { + logger.Debug("duplication policy is none") + return false, nil + } + tx, err := d.txProvider.New() + if err != nil { + logger.WithError(err).Error("failed getting tx from txProvider") + return true, err + } + defer func() { + err = tx.Rollback() + if err != nil { + logger.WithError(err).Error("could not commit tx for query MessageIDExists") + } + }() + selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)" + + var exists bool + err = tx.QueryRow(selectSQL, id).Scan(&exists) + if err != nil && err == sql.ErrNoRows { + logger.WithField("table_name", d.tableName).Debug("no rows in result set when looking for messages in duplicates table") + return false, nil + } + + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing lookup query in duplicates table") + return true, err + } + + return exists, nil +} + +func New(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration) gbus.Deduplicator { + d := &dedup{ + svcName: svcName, + policy: policy, + txProvider: txProvider, + age: age, + tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"), + started: false, + } + return d +} diff --git a/gbus/deduplicator/implementation/tx.go b/gbus/deduplicator/implementation/tx.go deleted file mode 100644 index 35eb66a..0000000 --- a/gbus/deduplicator/implementation/tx.go +++ /dev/null @@ -1,138 +0,0 @@ -package implementation - -import ( - "database/sql" - "time" - - "emperror.dev/errors" - "github.com/sirupsen/logrus" - - "github.com/wework/grabbit/gbus" - "github.com/wework/grabbit/gbus/deduplicator" - "github.com/wework/grabbit/gbus/tx" -) - -var _ deduplicator.Store = &deduper{} - -type deduper struct { - *gbus.Glogged - svcName string - policy gbus.DeduplicationPolicy - txProvider gbus.TxProvider - age time.Duration - ticker *time.Ticker - done chan bool - tableName string -} - -func (d *deduper) Purge() (err error) { - truncateSQL := "TRUNCATE TABLE " + d.tableName - txp, err := d.txProvider.New() - if err != nil { - return err - } - defer func() { - if err != nil { - serr := txp.Rollback() - err = errors.Append(err, serr) - } - err = txp.Commit() - }() - _, err = txp.Exec(truncateSQL) - if err != nil { - return err - } - return nil -} - -func (d *deduper) Start() { - d.ticker = time.NewTicker(time.Minute) - d.done = make(chan bool) - deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?" - go func() { - for { - select { - case <-d.done: - return - case <-d.ticker.C: - oldest := time.Now().Add(-1 * d.age) - tx, err := d.txProvider.New() - if err != nil { - d.Log().WithError(err).Error("failed to acquire a tx") - continue - } - result, err := tx.Exec(deleteQuery, oldest) - if err != nil && err != sql.ErrNoRows { - d.Log().WithError(err).Error("failed executing delete query") - } - n, err := result.RowsAffected() - if err != nil { - d.Log().WithError(err).Error("failed to get count of affected rows") - } else { - d.Log().WithField("table_name", d.tableName).WithField("rows_deleted", n). - Info("successfully cleanup duplicates table") - } - } - } - }() -} - -func (d *deduper) Stop() { - d.Log().Info("shutting down deduplicator") - d.ticker.Stop() - close(d.done) -} - -// -func (d *deduper) StoreMessageID(tx *sql.Tx, id string) error { - insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)" - _, err := tx.Exec(insertSQL, id) - if err != nil { - d.Log().WithError(err).Error("failed to insert the id of the message into the dedup table") - return err - } - return nil -} - -// MessageExists checks if a message id is in the deduplication table and returns an error if it fails -func (d *deduper) MessageExists(id string) (bool, error) { - if d.policy == gbus.DeduplicationPolicyNone { - return false, nil - } - tx, err := d.txProvider.New() - if err != nil { - return true, err - } - defer func() { - err = tx.Rollback() - if err != nil { - d.Log().WithError(err).Error("could not commit tx for query MessageExists") - } - }() - selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)" - - var exists bool - err = tx.QueryRow(selectSQL, id).Scan(&exists) - if err != nil && err == sql.ErrNoRows { - return false, nil - } - - if err != nil { - return true, err - } - - return exists, nil -} - -func NewDeduplicator(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration, logger logrus.FieldLogger) deduplicator.Store { - d := &deduper{ - svcName: svcName, - policy: policy, - txProvider: txProvider, - age: age, - tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"), - } - l := logger.WithField("grabbit", "deduplicator") - d.SetLogger(l) - return d -} diff --git a/gbus/deduplicator/store.go b/gbus/deduplicator/store.go deleted file mode 100644 index efd6769..0000000 --- a/gbus/deduplicator/store.go +++ /dev/null @@ -1,14 +0,0 @@ -package deduplicator - -import ( - "database/sql" -) - -// Store abstracts the way deduplicateor manages the -type Store interface { - StoreMessageID(tx *sql.Tx, id string) error - MessageExists(id string) (bool, error) - Purge() error - Start() - Stop() -} diff --git a/gbus/worker.go b/gbus/worker.go index d892416..0907eeb 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -9,7 +9,8 @@ import ( "sync" "time" - "github.com/wework/grabbit/gbus/deduplicator" + "emperror.dev/emperror" + "github.com/wework/grabbit/gbus/metrics" "emperror.dev/errors" @@ -27,27 +28,27 @@ import ( type worker struct { *Safety - channel *amqp.Channel - messages <-chan amqp.Delivery - rpcMessages <-chan amqp.Delivery - q amqp.Queue - rpcq amqp.Queue - consumerTag string - svcName string - rpcLock *sync.Mutex - handlersLock *sync.Mutex - registrations []*Registration - rpcHandlers map[string]MessageHandler - deadletterHandler RawMessageHandler - globalRawHandler RawMessageHandler - b *DefaultBus - serializer Serializer - txProvider TxProvider - amqpErrors chan *amqp.Error - stop chan bool - span opentracing.Span - duplicateStore deduplicator.Store - delicatePolicy DeduplicationPolicy + channel *amqp.Channel + messages <-chan amqp.Delivery + rpcMessages <-chan amqp.Delivery + q amqp.Queue + rpcq amqp.Queue + consumerTag string + svcName string + rpcLock *sync.Mutex + handlersLock *sync.Mutex + registrations []*Registration + rpcHandlers map[string]MessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler + b *DefaultBus + serializer Serializer + txProvider TxProvider + amqpErrors chan *amqp.Error + stop chan bool + span opentracing.Span + deduplicator Deduplicator + deduplicationPolicy DeduplicationPolicy } func (worker *worker) Start() error { @@ -77,15 +78,11 @@ func (worker *worker) Start() error { func (worker *worker) Stop() error { worker.log().Info("stopping worker") - e1 := worker.channel.Cancel(worker.consumerTag, false) - e2 := worker.channel.Cancel(worker.consumerTag+"_rpc", false) - if e1 != nil { - return e1 - } - if e2 != nil { - return e2 - } - return nil + err := emperror.NewMultiErrorBuilder() + err.Add(worker.channel.Cancel(worker.consumerTag, false)) + err.Add(worker.channel.Cancel(worker.consumerTag+"_rpc", false)) + worker.deduplicator.Stop(worker.log()) + return err.ErrOrNil() } func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<-chan amqp.Delivery, error) { @@ -355,7 +352,9 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { if err != nil { worker.span.LogFields(slog.Error(err)) msgSpecificLogEntry.WithError(err).Info("failed getting information about message duplication") - _ = worker.reject(true, delivery, msgSpecificLogEntry) + _ = worker.SafeWithRetries(func() error { + return worker.reject(true, delivery, msgSpecificLogEntry) + }, MaxRetryCount) return } if isDuplicate { @@ -486,18 +485,25 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan } func (worker *worker) handleDuplicates(message *BusMessage, delivery amqp.Delivery, msgSpecificLogEntry *logrus.Entry) (bool, error) { - if worker.delicatePolicy == DeduplicationPolicyNone { + if worker.deduplicationPolicy == DeduplicationPolicyNone { return false, nil } - duplicate, err := worker.duplicateStore.MessageExists(message.IdempotencyKey) + duplicate := true + err := worker.SafeWithRetries(func() error { + var ierr error + duplicate, ierr = worker.deduplicator.MessageIDExists(msgSpecificLogEntry, message.IdempotencyKey) + return ierr + }, MaxRetryCount) if err != nil { worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) worker.log().WithError(err).Error("failed checking for existing message") return true, err } if duplicate { - msgSpecificLogEntry.Error("message is a duplicate") - err = worker.duplicatePolicyApply(false, delivery, msgSpecificLogEntry) + msgSpecificLogEntry.Info("message is a duplicate") + err = worker.SafeWithRetries(func() error { + return worker.duplicatePolicyApply(false, delivery, msgSpecificLogEntry) + }, MaxRetryCount) if err != nil { msgSpecificLogEntry.WithError(err).Error("failed handling duplicate delivery") worker.span.LogFields(slog.Error(errors.New("failed handling duplicate delivery"))) @@ -508,7 +514,7 @@ func (worker *worker) handleDuplicates(message *BusMessage, delivery amqp.Delive } func (worker *worker) duplicatePolicyApply(requeue bool, delivery amqp.Delivery, msgSpecificLogEntry logrus.FieldLogger) error { - switch worker.delicatePolicy { + switch worker.deduplicationPolicy { case DeduplicationPolicyReject: msgSpecificLogEntry.Info("rejecting message on duplicate") worker.span.LogFields(slog.String("grabbit", "rejecting duplicate")) @@ -526,10 +532,10 @@ func (worker *worker) duplicatePolicyApply(requeue bool, delivery amqp.Delivery, func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, message *BusMessage, logger logrus.FieldLogger) (func(tx *sql.Tx) error, logrus.FieldLogger) { return func(tx *sql.Tx) error { - if worker.delicatePolicy == DeduplicationPolicyNone { + if worker.deduplicationPolicy == DeduplicationPolicyNone { return txWrapper(tx) } - err := worker.duplicateStore.StoreMessageID(tx, message.IdempotencyKey) + err := worker.deduplicator.StoreMessageID(logger, tx, message.IdempotencyKey) if err != nil { return err } diff --git a/go.mod b/go.mod index ac661d2..af83c66 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/wework/grabbit require ( + emperror.dev/emperror v0.21.3 emperror.dev/errors v0.4.3 emperror.dev/handler/logrus v0.1.0 github.com/Rican7/retry v0.1.0 @@ -15,6 +16,8 @@ require ( github.com/klauspost/cpuid v1.2.1 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/lopezator/migrator v0.2.0 + github.com/onsi/ginkgo v1.11.0 // indirect + github.com/onsi/gomega v1.8.1 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.3.0+incompatible // indirect @@ -32,6 +35,7 @@ require ( golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f // indirect google.golang.org/appengine v1.6.5 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect + gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 8b9aaca..eb68575 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,12 @@ +emperror.dev/emperror v0.21.3 h1:/S3xa/ljmXKTsrxN8ttCE/eq7fmY/4H4xyqbiunObss= +emperror.dev/emperror v0.21.3/go.mod h1:aeDoz3ERR3yJblyjfKojXoFFsXSd6K8Wfd4Zb1eEbZg= emperror.dev/errors v0.4.1/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= emperror.dev/errors v0.4.3 h1:yfhVxX1vzHgCDXh0KL+gVKfKhXlJCabmc79jS6QQuus= emperror.dev/errors v0.4.3/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= emperror.dev/handler/logrus v0.1.0 h1:hiBAmANpRKU4/RxARem/P5r0c8fa1Hx/BqX0mpplflg= emperror.dev/handler/logrus v0.1.0/go.mod h1:aulFiWRpZEHT8+r2DUgd/IniD/dyNS9Quxn//jU5aYA= -github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= -github.com/Shopify/sarama v1.21.0/go.mod h1:yuqtN/pe8cXRWG5zPaO7hCfNJp5MwmkoJEoLjkm5tCQ= github.com/Shopify/sarama v1.24.0 h1:99vo5VAgQybHwZwiOy/RX/S3i0somjGxur3pLeheqzI= github.com/Shopify/sarama v1.24.0/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= @@ -51,7 +51,6 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= @@ -72,14 +71,12 @@ github.com/klauspost/compress v1.8.6 h1:970MQcQdxX7hfgc/aqmB4a3grW0ivUVV6i1TLkP8 github.com/klauspost/compress v1.8.6/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= -github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= @@ -96,15 +93,14 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= -github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= -github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= +github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34= +github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:OhtbNVqXz6DuVXGvwPYXnNwQy1n2rI+2mID9CQOok9U= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.3.0+incompatible h1:CZzRn4Ut9GbUkHlQ7jqBXeZQV41ZSKWFc302ZU6lUTk= github.com/pierrec/lz4 v2.3.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -133,15 +129,11 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3 h1:yxIAU7hTyjCJeX5B/ydcwre0LcPTaHge5DU2pqND+PA= -github.com/rhinof/grabbit v0.0.0-20190411110638-a50e536d03e3/go.mod h1:enxXXxSAurM+2jWHMDQYCUS9Tjpjs1J2C4xbZcBo8IU= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/streadway/amqp v0.0.0-20190312223743-14f78b41ce6d/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -158,14 +150,12 @@ go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc h1:c0o/qxkaO2LF5t6fQrT4b5hzyggAkLLlCUjqfRxd8Q4= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190322120337-addf6b3196f6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -174,12 +164,10 @@ golang.org/x/net v0.0.0-20191009170851-d66e71096ffb/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190322080309-f49334f85ddc/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -190,14 +178,14 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= @@ -218,3 +206,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=