diff --git a/gbus/abstractions.go b/gbus/abstractions.go index aa3ebe6..5a81b31 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -22,6 +22,14 @@ const ( REPLY Semantics = "reply" ) +type DeduplicationPolicy int + +const ( + DeduplicationPolicyNone DeduplicationPolicy = iota + DeduplicationPolicyReject + DeduplicationPolicyAck +) + //Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus type Bus interface { HandlerRegister @@ -213,6 +221,8 @@ type Builder interface { //WithLogger set custom logger instance WithLogger(logger logrus.FieldLogger) Builder + + WithDeduplicationPolicy(method DeduplicationPolicy, age time.Duration) Builder } //Invocation context for a specific processed message @@ -284,3 +294,17 @@ type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger } + +// Deduplicator abstracts the way to manages the duplications +type Deduplicator interface { + // StoreMessageID stores the message id in the storage + StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error + // MessageIDExists checks if message exists in storage + MessageIDExists(logger logrus.FieldLogger, id string) (bool, error) + // Deletes all data from the storage of the duplicator + Purge(logger logrus.FieldLogger) error + // Starts the background process which cleans the storage of the duplicator + Start(logger logrus.FieldLogger) + // Stops the background process of cleaning + Stop(logger logrus.FieldLogger) +} diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 7021326..64b0aa4 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -5,31 +5,35 @@ import ( "sync" "time" + "emperror.dev/errors" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/deduplicator" "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/serialization" "github.com/wework/grabbit/gbus/tx/mysql" ) type defaultBuilder struct { - PrefetchCount uint - connStr string - purgeOnStartup bool - sagaStoreConnStr string - txnl bool - txConnStr string - txnlProvider string - workerNum uint - serializer gbus.Serializer - dlx string - defaultPolicies []gbus.MessagePolicy - confirm bool - dbPingTimeout time.Duration - usingPingTimeout bool - logger logrus.FieldLogger - busCfg gbus.BusConfiguration + PrefetchCount uint + connStr string + purgeOnStartup bool + sagaStoreConnStr string + txnl bool + txConnStr string + txnlProvider string + workerNum uint + serializer gbus.Serializer + dlx string + defaultPolicies []gbus.MessagePolicy + confirm bool + dbPingTimeout time.Duration + usingPingTimeout bool + logger logrus.FieldLogger + busCfg gbus.BusConfiguration + deduplicationPolicy gbus.DeduplicationPolicy + deduplicationRetentionAge time.Duration } func (builder *defaultBuilder) Build(svcName string) gbus.Bus { @@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { DefaultPolicies: builder.defaultPolicies, DbPingTimeout: 3, Confirm: builder.confirm, + DeduplicationPolicy: builder.deduplicationPolicy, } var finalLogger logrus.FieldLogger @@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } + gb.Deduplicator = deduplicator.New(svcName, builder.deduplicationPolicy, gb.TxProvider, builder.deduplicationRetentionAge) //TODO move this into the NewSagaStore factory methods if builder.purgeOnStartup { @@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { errMsg := fmt.Errorf("grabbit: saga store faild to purge. error: %v", err) panic(errMsg) } + err = gb.Deduplicator.Purge(gb.Log()) + if err != nil { + errMsg := errors.NewWithDetails("duplicator failed to purge", "component", "grabbit", "feature", "deduplicator") + panic(errMsg) + } } glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager) glue.SetLogger(gb.Log()) @@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde return builder } +func (builder *defaultBuilder) WithDeduplicationPolicy(policy gbus.DeduplicationPolicy, age time.Duration) gbus.Builder { + builder.deduplicationPolicy = policy + builder.deduplicationRetentionAge = age + return builder +} + //New :) func New() Nu { return Nu{} @@ -218,9 +235,11 @@ type Nu struct { //Bus inits a new BusBuilder func (Nu) Bus(brokerConnStr string) gbus.Builder { return &defaultBuilder{ - busCfg: gbus.BusConfiguration{}, - PrefetchCount: 1, - connStr: brokerConnStr, - serializer: serialization.NewGobSerializer(), - defaultPolicies: make([]gbus.MessagePolicy, 0)} + busCfg: gbus.BusConfiguration{}, + PrefetchCount: 1, + connStr: brokerConnStr, + serializer: serialization.NewGobSerializer(), + defaultPolicies: make([]gbus.MessagePolicy, 0), + deduplicationPolicy: gbus.DeduplicationPolicyNone, + } } diff --git a/gbus/bus.go b/gbus/bus.go index 68f169c..a43621e 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -56,15 +56,17 @@ type DefaultBus struct { Glue SagaGlue TxProvider TxProvider - WorkerNum uint - Serializer Serializer - DLX string - DefaultPolicies []MessagePolicy - Confirm bool - healthChan chan error - backpressure bool - DbPingTimeout time.Duration - amqpConnected bool + WorkerNum uint + Serializer Serializer + DLX string + DeduplicationPolicy DeduplicationPolicy + Deduplicator Deduplicator + DefaultPolicies []MessagePolicy + Confirm bool + healthChan chan error + backpressure bool + DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -222,6 +224,8 @@ func (b *DefaultBus) Start() error { return startErr } + b.Deduplicator.Start(b.Log()) + //declare queue var q amqp.Queue if q, e = b.createServiceQueue(); e != nil { @@ -280,21 +284,24 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { tag := fmt.Sprintf("%s_worker_%d", b.SvcName, i) w := &worker{ - consumerTag: tag, - channel: amqpChan, - q: b.serviceQueue, - rpcq: b.rpcQueue, - svcName: b.SvcName, - txProvider: b.TxProvider, - rpcLock: b.RPCLock, - rpcHandlers: b.RPCHandlers, - deadletterHandler: b.deadletterHandler, - globalRawHandler: b.globalRawHandler, - handlersLock: &sync.Mutex{}, - registrations: b.Registrations, - serializer: b.Serializer, - b: b, - amqpErrors: b.amqpErrors} + consumerTag: tag, + channel: amqpChan, + q: b.serviceQueue, + rpcq: b.rpcQueue, + svcName: b.SvcName, + txProvider: b.TxProvider, + rpcLock: b.RPCLock, + rpcHandlers: b.RPCHandlers, + deadletterHandler: b.deadletterHandler, + globalRawHandler: b.globalRawHandler, + handlersLock: &sync.Mutex{}, + registrations: b.Registrations, + serializer: b.Serializer, + b: b, + amqpErrors: b.amqpErrors, + deduplicationPolicy: b.DeduplicationPolicy, + deduplicator: b.Deduplicator, + } err := w.Start() if err != nil { diff --git a/gbus/deduplicator/deduper.go b/gbus/deduplicator/deduper.go new file mode 100644 index 0000000..a4aca2c --- /dev/null +++ b/gbus/deduplicator/deduper.go @@ -0,0 +1,162 @@ +package deduplicator + +import ( + "database/sql" + "sync" + "time" + + "emperror.dev/errors" + "github.com/sirupsen/logrus" + + "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/tx" +) + +var _ gbus.Deduplicator = &dedup{} + +type dedup struct { + svcName string + policy gbus.DeduplicationPolicy + txProvider gbus.TxProvider + age time.Duration + ticker *time.Ticker + done chan bool + tableName string + started bool + startStopMutex sync.Mutex +} + +func (d *dedup) Purge(logger logrus.FieldLogger) (err error) { + truncateSQL := "TRUNCATE TABLE " + d.tableName + txp, err := d.txProvider.New() + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed purging duplicates table") + return err + } + defer func() { + if err != nil { + serr := txp.Rollback() + logger.WithError(serr).Error("failed rolling back transaction after purge") + err = errors.Append(err, serr) + } + err = txp.Commit() + }() + _, err = txp.Exec(truncateSQL) + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing truncate on table") + return err + } + logger.WithField("table_name", d.tableName).Info("successfully truncated table") + return nil +} + +func (d *dedup) Start(l logrus.FieldLogger) { + d.startStopMutex.Lock() + defer d.startStopMutex.Unlock() + logger := d.decoratedLog(l) + d.ticker = time.NewTicker(time.Minute) + d.done = make(chan bool) + deleteQuery := "DELETE FROM " + d.tableName + " WHERE `created_at` < ?" + go func() { + for { + select { + case <-d.done: + return + case <-d.ticker.C: + oldest := time.Now().Add(-1 * d.age) + tx, err := d.txProvider.New() + if err != nil { + logger.WithError(err).Error("failed to acquire a tx") + continue + } + result, err := tx.Exec(deleteQuery, oldest) + if err != nil && err != sql.ErrNoRows { + logger.WithError(err).Error("failed executing delete query") + continue + } + n, err := result.RowsAffected() + if err != nil { + logger.WithError(err).Error("failed to get count of affected rows") + } else { + logger.WithField("table_name", d.tableName).WithField("rows_deleted", n). + Info("successfully cleanup duplicates table") + } + } + } + }() + d.started = true +} + +func (d *dedup) decoratedLog(l logrus.FieldLogger) logrus.FieldLogger { + logger := l.WithField("grabbit", "dedup") + return logger +} + +func (d *dedup) Stop(logger logrus.FieldLogger) { + d.decoratedLog(logger).Info("shutting down deduplicator") + d.startStopMutex.Lock() + defer d.startStopMutex.Unlock() + if d.started { + d.ticker.Stop() + close(d.done) + d.started = false + } +} + +// +func (d *dedup) StoreMessageID(logger logrus.FieldLogger, tx *sql.Tx, id string) error { + insertSQL := "INSERT INTO " + d.tableName + " (id) values (?)" + _, err := tx.Exec(insertSQL, id) + if err != nil { + d.decoratedLog(logger).WithError(err).Error("failed to insert the id of the message into the dedup table") + return err + } + return nil +} + +// MessageIDExists checks if a message id is in the deduplication table and returns an error if it fails +func (d *dedup) MessageIDExists(l logrus.FieldLogger, id string) (bool, error) { + logger := d.decoratedLog(l) + if d.policy == gbus.DeduplicationPolicyNone { + logger.Debug("duplication policy is none") + return false, nil + } + tx, err := d.txProvider.New() + if err != nil { + logger.WithError(err).Error("failed getting tx from txProvider") + return true, err + } + defer func() { + err = tx.Rollback() + if err != nil { + logger.WithError(err).Error("could not commit tx for query MessageIDExists") + } + }() + selectSQL := "SELECT EXISTS (SELECT id FROM " + d.tableName + " WHERE id = ? limit 1)" + + var exists bool + err = tx.QueryRow(selectSQL, id).Scan(&exists) + if err != nil && err == sql.ErrNoRows { + logger.WithField("table_name", d.tableName).Debug("no rows in result set when looking for messages in duplicates table") + return false, nil + } + + if err != nil { + logger.WithError(err).WithField("table_name", d.tableName).Error("failed executing lookup query in duplicates table") + return true, err + } + + return exists, nil +} + +func New(svcName string, policy gbus.DeduplicationPolicy, txProvider gbus.TxProvider, age time.Duration) gbus.Deduplicator { + d := &dedup{ + svcName: svcName, + policy: policy, + txProvider: txProvider, + age: age, + tableName: tx.GrabbitTableNameTemplate(svcName, "duplicates"), + started: false, + } + return d +} diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go index 9458b47..8679a63 100644 --- a/gbus/metrics/message_metrics.go +++ b/gbus/metrics/message_metrics.go @@ -7,7 +7,9 @@ import ( ) var ( - rejectedMessages = newRejectedMessagesCounter() + rejectedMessages = newRejectedMessagesCounter() + duplicateMessageAck = newDuplicateMessageCounter("ack") + duplicateMessageReject = newDuplicateMessageCounter("reject") ) //ResetRejectedMessagesCounter resets the counter intended to be used in tests only @@ -35,7 +37,6 @@ func GetRejectedMessagesValue() (float64, error) { } func newRejectedMessagesCounter() prometheus.Counter { - return promauto.NewCounter(prometheus.CounterOpts{ Namespace: grabbitPrefix, Subsystem: "messages", @@ -43,3 +44,55 @@ func newRejectedMessagesCounter() prometheus.Counter { Help: "counting the rejected messages", }) } + +// ResetDuplicateMessagesCounters resets the counter intended to be used in tests only +func ResetDuplicateMessagesCounters() { + prometheus.Unregister(duplicateMessageAck) + duplicateMessageAck = newDuplicateMessageCounter("ack") + prometheus.Unregister(duplicateMessageReject) + duplicateMessageReject = newDuplicateMessageCounter("reject") +} + +// ReportDuplicateMessageAck reports a message is a duplicate and Acked +func ReportDuplicateMessageAck() { + duplicateMessageAck.Inc() +} + +// ReportDuplicateMessageReject reports a message is a duplicate and Rejected +func ReportDuplicateMessageReject() { + duplicateMessageReject.Inc() +} + +// GetDuplicateMessageRejectValue gets the value of the rejected message counter +func GetDuplicateMessageRejectValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := duplicateMessageReject.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +// GetDuplicateMessageAckValue gets the value of the rejected message counter +func GetDuplicateMessageAckValue() (float64, error) { + m := &io_prometheus_client.Metric{} + err := duplicateMessageAck.Write(m) + + if err != nil { + return 0, err + } + + return m.GetCounter().GetValue(), nil +} + +func newDuplicateMessageCounter(rabbitAnswer string) prometheus.Counter { + return prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: grabbitPrefix, + Subsystem: "messages", + Name: "duplicate_messages", + Help: "counting the duplicate messages", + ConstLabels: prometheus.Labels{"type": rabbitAnswer}, + }) +} diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go index 2ae78ae..2092a9c 100644 --- a/gbus/tx/mysql/migrations.go +++ b/gbus/tx/mysql/migrations.go @@ -175,6 +175,7 @@ func EnsureSchema(db *sql.DB, svcName string) { sagaStoreAddSagaCreatorDetails(svcName), sagaStoreAddRPCIDDetails(svcName), sagaStoreAddCreatedAtDetails(svcName), + createDeduplicationTable(svcName), )) if err != nil { panic(err) @@ -185,6 +186,25 @@ func EnsureSchema(db *sql.DB, svcName string) { } } +func createDeduplicationTable(svcName string) *migrator.Migration { + tblName := tx.GrabbitTableNameTemplate(svcName, "duplicates") + + createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + id VARCHAR(256) NOT NULL PRIMARY KEY, + created_at timestamp DEFAULT CURRENT_TIMESTAMP, + INDEX ` + tblName + `_created_at_idx (created_at))` + + return &migrator.Migration{ + Name: "create a table to manage duplicate messages", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(createTableQuery); err != nil { + return err + } + return nil + }, + } +} + func sanitizeSvcName(svcName string) string { sanitized := tx.SanitizeTableName(svcName) diff --git a/gbus/worker.go b/gbus/worker.go index 093a7bd..0907eeb 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "emperror.dev/emperror" + "github.com/wework/grabbit/gbus/metrics" "emperror.dev/errors" @@ -26,25 +28,27 @@ import ( type worker struct { *Safety - channel *amqp.Channel - messages <-chan amqp.Delivery - rpcMessages <-chan amqp.Delivery - q amqp.Queue - rpcq amqp.Queue - consumerTag string - svcName string - rpcLock *sync.Mutex - handlersLock *sync.Mutex - registrations []*Registration - rpcHandlers map[string]MessageHandler - deadletterHandler RawMessageHandler - globalRawHandler RawMessageHandler - b *DefaultBus - serializer Serializer - txProvider TxProvider - amqpErrors chan *amqp.Error - stop chan bool - span opentracing.Span + channel *amqp.Channel + messages <-chan amqp.Delivery + rpcMessages <-chan amqp.Delivery + q amqp.Queue + rpcq amqp.Queue + consumerTag string + svcName string + rpcLock *sync.Mutex + handlersLock *sync.Mutex + registrations []*Registration + rpcHandlers map[string]MessageHandler + deadletterHandler RawMessageHandler + globalRawHandler RawMessageHandler + b *DefaultBus + serializer Serializer + txProvider TxProvider + amqpErrors chan *amqp.Error + stop chan bool + span opentracing.Span + deduplicator Deduplicator + deduplicationPolicy DeduplicationPolicy } func (worker *worker) Start() error { @@ -74,15 +78,11 @@ func (worker *worker) Start() error { func (worker *worker) Stop() error { worker.log().Info("stopping worker") - e1 := worker.channel.Cancel(worker.consumerTag, false) - e2 := worker.channel.Cancel(worker.consumerTag+"_rpc", false) - if e1 != nil { - return e1 - } - if e2 != nil { - return e2 - } - return nil + err := emperror.NewMultiErrorBuilder() + err.Add(worker.channel.Cancel(worker.consumerTag, false)) + err.Add(worker.channel.Cancel(worker.consumerTag+"_rpc", false)) + worker.deduplicator.Stop(worker.log()) + return err.ErrOrNil() } func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<-chan amqp.Delivery, error) { @@ -305,7 +305,7 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { msgSpecificLogEntry.Info("GOT MSG") - //handle a message that originated from a deadletter exchange + // handle a message that originated from a deadletter exchange if worker.isDead(delivery) && worker.deadletterHandler != nil { worker.span.LogFields(slog.Error(errors.New("handling dead-letter delivery"))) msgSpecificLogEntry.Info("invoking deadletter handler") @@ -314,18 +314,16 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } if err := worker.runGlobalHandler(&delivery, msgSpecificLogEntry); err != nil { - //when the global handler fails terminate executation and reject the message + // when the global handler fails terminate executation and reject the message _ = worker.reject(false, delivery, msgSpecificLogEntry) return } - //TODO:Dedup message - handlers := worker.resolveHandlers(isRPCreply, delivery, msgSpecificLogEntry) if len(handlers) == 0 { msgSpecificLogEntry.Warn("Message received but no handlers found") worker.span.LogFields(slog.String("grabbit", "no handlers found")) - //remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue + // remove the message by acking it and not rejecting it so it will not be routed to a deadletter queue _ = worker.ack(delivery, msgSpecificLogEntry) return } @@ -346,11 +344,24 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { bm, err := worker.extractBusMessage(delivery, msgSpecificLogEntry) if err != nil { worker.span.LogFields(slog.Error(err), slog.String("grabbit", "message is poison")) - //reject poison message + // reject poison message _ = worker.reject(false, delivery, msgSpecificLogEntry) return } - + isDuplicate, err := worker.handleDuplicates(bm, delivery, msgSpecificLogEntry) + if err != nil { + worker.span.LogFields(slog.Error(err)) + msgSpecificLogEntry.WithError(err).Info("failed getting information about message duplication") + _ = worker.SafeWithRetries(func() error { + return worker.reject(true, delivery, msgSpecificLogEntry) + }, MaxRetryCount) + return + } + if isDuplicate { + msgSpecificLogEntry.Warn("message is a duplicate") + worker.span.LogFields(slog.String("grabbit", "message is a duplicate")) + return + } err = worker.invokeHandlers(ctx, handlers, bm, &delivery, msgSpecificLogEntry) if err == nil { _ = worker.ack(delivery, msgSpecificLogEntry) @@ -453,7 +464,7 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return handlerErr } - err := worker.withTx(handlerWrapper, msgSpecificLogEntry) + err := worker.withTx(worker.withDeduplicator(handlerWrapper, message, msgSpecificLogEntry)) if err != nil { return err } @@ -473,6 +484,65 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan )) } +func (worker *worker) handleDuplicates(message *BusMessage, delivery amqp.Delivery, msgSpecificLogEntry *logrus.Entry) (bool, error) { + if worker.deduplicationPolicy == DeduplicationPolicyNone { + return false, nil + } + duplicate := true + err := worker.SafeWithRetries(func() error { + var ierr error + duplicate, ierr = worker.deduplicator.MessageIDExists(msgSpecificLogEntry, message.IdempotencyKey) + return ierr + }, MaxRetryCount) + if err != nil { + worker.span.LogFields(slog.String("grabbit", "failed processing duplicate")) + worker.log().WithError(err).Error("failed checking for existing message") + return true, err + } + if duplicate { + msgSpecificLogEntry.Info("message is a duplicate") + err = worker.SafeWithRetries(func() error { + return worker.duplicatePolicyApply(false, delivery, msgSpecificLogEntry) + }, MaxRetryCount) + if err != nil { + msgSpecificLogEntry.WithError(err).Error("failed handling duplicate delivery") + worker.span.LogFields(slog.Error(errors.New("failed handling duplicate delivery"))) + } + return true, err + } + return false, nil +} + +func (worker *worker) duplicatePolicyApply(requeue bool, delivery amqp.Delivery, msgSpecificLogEntry logrus.FieldLogger) error { + switch worker.deduplicationPolicy { + case DeduplicationPolicyReject: + msgSpecificLogEntry.Info("rejecting message on duplicate") + worker.span.LogFields(slog.String("grabbit", "rejecting duplicate")) + metrics.ReportDuplicateMessageReject() + return worker.reject(false, delivery, msgSpecificLogEntry) + case DeduplicationPolicyAck: + msgSpecificLogEntry.Info("acknowledging duplicate") + worker.span.LogFields(slog.String("grabbit", "acknowledging duplicate")) + metrics.ReportDuplicateMessageAck() + return worker.ack(delivery, msgSpecificLogEntry) + default: + return errors.NewWithDetails("invalid deduplication policy", "policy", worker.b.DeduplicationPolicy) + } +} + +func (worker *worker) withDeduplicator(txWrapper func(tx *sql.Tx) error, message *BusMessage, logger logrus.FieldLogger) (func(tx *sql.Tx) error, logrus.FieldLogger) { + return func(tx *sql.Tx) error { + if worker.deduplicationPolicy == DeduplicationPolicyNone { + return txWrapper(tx) + } + err := worker.deduplicator.StoreMessageID(logger, tx, message.IdempotencyKey) + if err != nil { + return err + } + return txWrapper(tx) + }, logger +} + func (worker *worker) log() logrus.FieldLogger { return worker.b.Log().WithField("worker", worker.consumerTag) } diff --git a/go.mod b/go.mod index ff5a3f4..af83c66 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/wework/grabbit require ( + emperror.dev/emperror v0.21.3 emperror.dev/errors v0.4.3 emperror.dev/handler/logrus v0.1.0 github.com/Rican7/retry v0.1.0 @@ -13,11 +14,10 @@ require ( github.com/jcmturner/gofork v1.0.0 // indirect github.com/klauspost/compress v1.8.6 // indirect github.com/klauspost/cpuid v1.2.1 // indirect - github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/lopezator/migrator v0.2.0 - github.com/onsi/ginkgo v1.8.0 // indirect - github.com/onsi/gomega v1.5.0 // indirect + github.com/onsi/ginkgo v1.11.0 // indirect + github.com/onsi/gomega v1.8.1 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.3.0+incompatible // indirect @@ -34,7 +34,6 @@ require ( golang.org/x/net v0.0.0-20191009170851-d66e71096ffb // indirect golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f // indirect google.golang.org/appengine v1.6.5 // indirect - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect ) diff --git a/go.sum b/go.sum index f73ddfa..eb68575 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +emperror.dev/emperror v0.21.3 h1:/S3xa/ljmXKTsrxN8ttCE/eq7fmY/4H4xyqbiunObss= +emperror.dev/emperror v0.21.3/go.mod h1:aeDoz3ERR3yJblyjfKojXoFFsXSd6K8Wfd4Zb1eEbZg= emperror.dev/errors v0.4.1/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= emperror.dev/errors v0.4.3 h1:yfhVxX1vzHgCDXh0KL+gVKfKhXlJCabmc79jS6QQuus= emperror.dev/errors v0.4.3/go.mod h1:cA5SMsyzo+KXq997DKGK+lTV1DGx5TXLQUNtYe9p2p0= @@ -69,9 +71,8 @@ github.com/klauspost/compress v1.8.6 h1:970MQcQdxX7hfgc/aqmB4a3grW0ivUVV6i1TLkP8 github.com/klauspost/compress v1.8.6/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= -github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -92,10 +93,10 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= -github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= -github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= +github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34= +github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:OhtbNVqXz6DuVXGvwPYXnNwQy1n2rI+2mID9CQOok9U= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= @@ -177,13 +178,14 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= @@ -204,3 +206,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/tests/bus_test.go b/tests/bus_test.go index ded3a78..f3a624a 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -854,3 +854,139 @@ func proceedOrTimeout(timeout time.Duration, p chan bool, onProceed func(), t *t t.Fatal("timeout") } } + +func TestDeduplicationReject(t *testing.T) { + metrics.ResetDuplicateMessagesCounters() + cmd := Command1{ + Data: "Command1", + } + proceed := make(chan bool) + b := createBusWithConfig(testSvc1, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}, func(builder gbus.Builder) { + builder.WithDeduplicationPolicy(gbus.DeduplicationPolicyReject, 1*time.Hour) + }) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, ok := message.Payload.(*Command1) + if !ok { + t.Errorf("handler invoced with wrong message type\r\nexpeted:%v\r\nactual:%v", reflect.TypeOf(Command1{}), reflect.TypeOf(message.Payload)) + } + proceed <- true + + return nil + } + + err := b.HandleMessage(cmd, handler) + if err != nil { + t.Errorf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer assertBusShutdown(b, t) + + bm := gbus.NewBusMessage(cmd) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + proceedOrTimeout(2, proceed, nil, t) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + done := make(chan bool) + ticker := time.NewTicker(500 * time.Millisecond) + rejected := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + rejectedValue, err := metrics.GetDuplicateMessageRejectValue() + if err != nil { + return + } + if rejectedValue == 1 { + rejected <- true + } + } + } + + }() + proceedOrTimeout(2, rejected, nil, t) + done <- true +} + +func TestDeduplicationAcked(t *testing.T) { + metrics.ResetDuplicateMessagesCounters() + cmd := Command1{ + Data: "Command1", + } + proceed := make(chan bool) + b := createBusWithConfig(testSvc1, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15}, func(builder gbus.Builder) { + builder.WithDeduplicationPolicy(gbus.DeduplicationPolicyAck, 1*time.Hour) + }) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, ok := message.Payload.(*Command1) + if !ok { + t.Errorf("handler invoced with wrong message type\r\nexpeted:%v\r\nactual:%v", reflect.TypeOf(Command1{}), reflect.TypeOf(message.Payload)) + } + proceed <- true + + return nil + } + + err := b.HandleMessage(cmd, handler) + if err != nil { + t.Errorf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.Start() + if err != nil { + t.Errorf("could not start bus for test error: %s", err.Error()) + } + defer assertBusShutdown(b, t) + + bm := gbus.NewBusMessage(cmd) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + proceedOrTimeout(2, proceed, nil, t) + err = b.Send(context.Background(), testSvc1, bm) + if err != nil { + t.Errorf("could not send message error: %s", err.Error()) + return + } + done := make(chan bool) + ticker := time.NewTicker(500 * time.Millisecond) + acked := make(chan bool) + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + ackedValue, err := metrics.GetDuplicateMessageAckValue() + if err != nil { + return + } + if ackedValue == 1 { + acked <- true + } + } + } + + }() + proceedOrTimeout(2, acked, nil, t) + done <- true +} diff --git a/tests/metrics_test.go b/tests/metrics_test.go index 9d1b10e..d858ea5 100644 --- a/tests/metrics_test.go +++ b/tests/metrics_test.go @@ -130,7 +130,7 @@ func TestRunHandlerWithMetric_Latency(t *testing.T) { } } -func checkLatency(t *testing.T, sc *uint64, expected uint64, err error) { +func checkLatency(t *testing.T, sc *uint64, expected uint64, err error) { if err != nil { t.Errorf("Failed to get latency value: %e", err) }