diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 9b4c0d1..60583b5 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -77,6 +77,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } gb.TxProvider = mysqltx + + mysql.EnsureSchema(mysqltx.Database, gb.SvcName) + //TODO move purge logic into the NewSagaStore factory method sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) if builder.purgeOnStartup { diff --git a/gbus/bus.go b/gbus/bus.go index a0e7c4c..4443691 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -168,14 +168,6 @@ func (b *DefaultBus) bindServiceQueue() error { return nil } -func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) { - channel, e := conn.Channel() - if e != nil { - return nil, e - } - return channel, nil -} - //Start implements GBus.Start() func (b *DefaultBus) Start() error { @@ -188,10 +180,10 @@ func (b *DefaultBus) Start() error { return e } - if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil { + if b.ingressChannel, e = b.ingressConn.Channel(); e != nil { return e } - if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil { + if b.egressChannel, e = b.egressConn.Channel(); e != nil { return e } @@ -209,7 +201,7 @@ func (b *DefaultBus) Start() error { TODO://the design is crap and needs to be refactored */ var amqpChan *amqp.Channel - if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil { + if amqpChan, e = b.egressConn.Channel(); e != nil { b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") return e } @@ -272,7 +264,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { workers := make([]*worker, 0) for i := uint(0); i < workerNum; i++ { //create a channel per worker as we can't share channels across go routines - amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn) + amqpChan, createChanErr := b.ingressConn.Channel() if createChanErr != nil { return nil, createChanErr } diff --git a/gbus/invocation.go b/gbus/invocation.go index fc80095..b67a695 100644 --- a/gbus/invocation.go +++ b/gbus/invocation.go @@ -32,6 +32,7 @@ func (dfi *defaultInvocationContext) Log() logrus.FieldLogger { return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID}) } +//Reply implements the Invocation.Reply signature func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error { if dfi.inboundMsg != nil { replyMessage.CorrelationID = dfi.inboundMsg.ID @@ -51,6 +52,7 @@ func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *Bu return err } +//Send implements the Invocation.Send signature func (dfi *defaultInvocationContext) Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error { if dfi.tx != nil { return dfi.bus.sendWithTx(ctx, dfi.tx, toService, command, policies...) @@ -58,6 +60,7 @@ func (dfi *defaultInvocationContext) Send(ctx context.Context, toService string, return dfi.bus.Send(ctx, toService, command, policies...) } +//Publish implements the Invocation.Publish signature func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error { if dfi.tx != nil { @@ -66,26 +69,32 @@ func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topi return dfi.bus.Publish(ctx, exchange, topic, event, policies...) } +//RPC implements the Invocation.RPC signature func (dfi *defaultInvocationContext) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error) { return dfi.bus.RPC(ctx, service, request, reply, timeout) } +//Bus implements the Invocation.Bus signature func (dfi *defaultInvocationContext) Bus() Messaging { return dfi } +//Tx implements the Invocation.Tx signature func (dfi *defaultInvocationContext) Tx() *sql.Tx { return dfi.tx } +//Ctx implements the Invocation.Ctx signature func (dfi *defaultInvocationContext) Ctx() context.Context { return dfi.ctx } +//Routing implements the Invocation.Routing signature func (dfi *defaultInvocationContext) Routing() (exchange, routingKey string) { return dfi.exchange, dfi.routingKey } +//DeliveryInfo implements the Invocation.DeliveryInfo signature func (dfi *defaultInvocationContext) DeliveryInfo() DeliveryInfo { return dfi.deliveryInfo } diff --git a/gbus/message_handler.go b/gbus/message_handler.go index 776927b..67a7d6c 100644 --- a/gbus/message_handler.go +++ b/gbus/message_handler.go @@ -9,6 +9,7 @@ import ( //MessageHandler signature for all command handlers type MessageHandler func(invocation Invocation, message *BusMessage) error +//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type func (mg MessageHandler) Name() string { funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name() splits := strings.Split(funName, ".") diff --git a/gbus/metrics/handler_metrics.go b/gbus/metrics/handler_metrics.go index 4311796..040737e 100644 --- a/gbus/metrics/handler_metrics.go +++ b/gbus/metrics/handler_metrics.go @@ -2,10 +2,11 @@ package metrics import ( "fmt" + "sync" + "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_model/go" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/sirupsen/logrus" - "sync" ) var ( @@ -20,7 +21,7 @@ const ( grabbitPrefix = "grabbit" ) -type HandlerMetrics struct { +type handlerMetrics struct { result *prometheus.CounterVec latency prometheus.Summary } @@ -62,17 +63,17 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger return err } -func GetHandlerMetrics(handlerName string) *HandlerMetrics { +func GetHandlerMetrics(handlerName string) *handlerMetrics { entry, ok := handlerMetricsByHandlerName.Load(handlerName) if ok { - return entry.(*HandlerMetrics) + return entry.(*handlerMetrics) } return nil } -func newHandlerMetrics(handlerName string) *HandlerMetrics { - return &HandlerMetrics{ +func newHandlerMetrics(handlerName string) *handlerMetrics { + return &handlerMetrics{ result: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: grabbitPrefix, @@ -98,15 +99,15 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error return functionToTrack() } -func (hm *HandlerMetrics) GetSuccessCount() (float64, error) { +func (hm *handlerMetrics) GetSuccessCount() (float64, error) { return hm.getLabeledCounterValue(success) } -func (hm *HandlerMetrics) GetFailureCount() (float64, error) { +func (hm *handlerMetrics) GetFailureCount() (float64, error) { return hm.getLabeledCounterValue(failure) } -func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) { +func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) { m := &io_prometheus_client.Metric{} err := hm.latency.Write(m) if err != nil { @@ -116,7 +117,7 @@ func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) { return m.GetSummary().SampleCount, nil } -func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) { +func (hm *handlerMetrics) getLabeledCounterValue(label string) (float64, error) { m := &io_prometheus_client.Metric{} err := hm.result.WithLabelValues(label).Write(m) diff --git a/gbus/metrics/message_metrics.go b/gbus/metrics/message_metrics.go index 8bf0dfb..753ebca 100644 --- a/gbus/metrics/message_metrics.go +++ b/gbus/metrics/message_metrics.go @@ -32,4 +32,4 @@ func newRejectedMessagesCounter() prometheus.Counter { Name: "rejected_messages", Help: "counting the rejected messages", }) -} \ No newline at end of file +} diff --git a/gbus/metrics/saga_metrics.go b/gbus/metrics/saga_metrics.go index 188f4a0..8730565 100644 --- a/gbus/metrics/saga_metrics.go +++ b/gbus/metrics/saga_metrics.go @@ -6,8 +6,10 @@ import ( io_prometheus_client "github.com/prometheus/client_model/go" ) +//SagaTimeoutCounter is the prometheus counter counting timed out saga instances var SagaTimeoutCounter = newSagaTimeoutCounter() +//GetSagaTimeoutCounterValue gets the counter value of timed out sagas reported to prometheus func GetSagaTimeoutCounterValue() (float64, error) { m := &io_prometheus_client.Metric{} err := SagaTimeoutCounter.Write(m) diff --git a/gbus/outbox.go b/gbus/outbox.go index 3358250..dac293a 100644 --- a/gbus/outbox.go +++ b/gbus/outbox.go @@ -50,9 +50,9 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro return nil } +//Shutdown stops the outbox func (out *AMQPOutbox) Shutdown() { close(out.stop) - } //Post implements Outbox.Send diff --git a/gbus/serialization/avro.go b/gbus/serialization/avro.go index a718a9d..1f3cd88 100644 --- a/gbus/serialization/avro.go +++ b/gbus/serialization/avro.go @@ -87,7 +87,7 @@ func (as *Avro) Encode(obj gbus.Message) (msg []byte, err error) { tobj, ok := obj.(AvroMessageGenerated) if !ok { err := fmt.Errorf("could not convert obj to AvroMessageGenerated") - logrus.WithError(err).WithField("obj", obj).Error("could not convert object") + logrus.WithError(err).Error("could not convert object") return nil, err } diff --git a/gbus/serialization/proto.go b/gbus/serialization/proto.go index bff127a..43a50c4 100644 --- a/gbus/serialization/proto.go +++ b/gbus/serialization/proto.go @@ -75,7 +75,7 @@ func (as *Proto) Decode(buffer []byte, schemaName string) (msg gbus.Message, err msg, ok = tmsg.(gbus.Message) if !ok { err = fmt.Errorf("could not cast obj to gbus.Message") - as.logger.WithError(err).WithField("msg", tmsg).Errorf("could not cast %v to gbus.Message", tmsg) + as.logger.WithError(err).Errorf("could not cast %v to gbus.Message", tmsg) return nil, err } diff --git a/gbus/tx/mysql/migrations.go b/gbus/tx/mysql/migrations.go new file mode 100644 index 0000000..c629d9a --- /dev/null +++ b/gbus/tx/mysql/migrations.go @@ -0,0 +1,103 @@ +package mysql + +import ( + "database/sql" + + "fmt" + "github.com/lopezator/migrator" + "github.com/wework/grabbit/gbus/tx" +) + +//SagaStoreTableMigration creates the service saga store table +func SagaStoreTableMigration(svcName string) *migrator.Migration { + tblName := tx.GetSagatableName(svcName) + + createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + rec_id INT PRIMARY KEY AUTO_INCREMENT, + saga_id VARCHAR(255) UNIQUE NOT NULL, + saga_type VARCHAR(255) NOT NULL, + saga_data LONGBLOB NOT NULL, + version integer NOT NULL DEFAULT 0, + last_update timestamp DEFAULT NOW(), + INDEX ` + tblName + `_sagatype_idx (saga_type))` + + return &migrator.Migration{ + Name: "create saga store table", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(createTableQuery); err != nil { + return err + } + return nil + }, + } +} + +//OutboxMigrations creates service outbox table +func OutboxMigrations(svcName string) *migrator.Migration { + + query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` ( + rec_id int NOT NULL AUTO_INCREMENT, + message_id varchar(50) NOT NULL UNIQUE, + message_type varchar(50) NOT NULL, + exchange varchar(50) NOT NULL, + routing_key varchar(50) NOT NULL, + publishing longblob NOT NULL, + status int(11) NOT NULL, + relay_id varchar(50) NULL, + delivery_tag bigint(20) NOT NULL, + delivery_attempts int NOT NULL DEFAULT 0, + insert_date timestamp DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(rec_id), + INDEX status_delivery (rec_id, status, delivery_attempts))` + + return &migrator.Migration{ + Name: "create outbox table", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(query); err != nil { + return err + } + return nil + }, + } +} + +//TimoutTableMigration creates the service timeout table, where timeouts are persisted +func TimoutTableMigration(svcName string) *migrator.Migration { + tblName := GetTimeoutsTableName(svcName) + + createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( + rec_id INT PRIMARY KEY AUTO_INCREMENT, + saga_id VARCHAR(255) UNIQUE NOT NULL, + timeout DATETIME NOT NULL, + INDEX (timeout), + INDEX (saga_id) + )` + + return &migrator.Migration{ + Name: "create timeout table", + Func: func(tx *sql.Tx) error { + if _, err := tx.Exec(createTableQuery); err != nil { + return err + } + return nil + }, + } +} + +//EnsureSchema implements Grabbit's migrations strategy +func EnsureSchema(db *sql.DB, svcName string) { + migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName) + + migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations( + OutboxMigrations(svcName), + SagaStoreTableMigration(svcName), + TimoutTableMigration(svcName), + )) + if err != nil { + panic(err) + } + err = migrate.Migrate(db) + if err != nil { + panic(err) + } +} diff --git a/gbus/tx/mysql/sagastore.go b/gbus/tx/mysql/sagastore.go index 9217d48..3d1e4da 100644 --- a/gbus/tx/mysql/sagastore.go +++ b/gbus/tx/mysql/sagastore.go @@ -1,10 +1,6 @@ package mysql import ( - "database/sql" - "fmt" - - log "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/saga" "github.com/wework/grabbit/gbus/tx" @@ -15,85 +11,6 @@ type SagaStore struct { *tx.SagaStore } -func (store *SagaStore) log() *log.Entry { - return log.WithField("saga_store", "mysql") -} - -func (store *SagaStore) ensureSchema() { - store.log().Info("ensuring saga schema exists") - if tablesExists := store.sagaTablesExist(); !tablesExists { - - store.log().Info("could not find saga schema, attempting to creat schema") - - store.createSagaTables() - } -} - -func (store *SagaStore) sagaTablesExist() bool { - - tblName := store.GetSagatableName() - tx := store.NewTx() - defer func() { - err := tx.Commit() - if err != nil { - store.log().WithError(err).Error("could not commit sagaTablesExist") - } - }() - - selectSQL := `SELECT 1 FROM ` + tblName + ` LIMIT 1;` - - store.log().Info(selectSQL) - - row := tx.QueryRow(selectSQL) - var exists int - err := row.Scan(&exists) - if err != nil && err != sql.ErrNoRows { - return false - } - - return true -} - -func (store *SagaStore) createSagaTables() { - tblName := store.GetSagatableName() - tx := store.NewTx() - - createTable := `CREATE TABLE ` + tblName + ` ( - rec_id INT PRIMARY KEY AUTO_INCREMENT, - saga_id VARCHAR(255) UNIQUE NOT NULL, - saga_type VARCHAR(255) NOT NULL, - saga_data LONGBLOB NOT NULL, - version integer NOT NULL DEFAULT 0, - last_update timestamp DEFAULT NOW() - )` - createSagaTypeIndex := `CREATE INDEX ` + tblName + `_sagatype_idx ON ` + tblName + ` (saga_type)` - - sqls := [2]string{ - createTable, - createSagaTypeIndex} - - for i, sql := range sqls { - _, error := tx.Exec(sql) - - store.log().WithFields(log.Fields{"step": i, "sql": sql}).Info("creating saga tables") - - if error != nil { - txErr := fmt.Errorf("failed to create saga tables.\n%v", error) - if rollbackErr := tx.Rollback(); rollbackErr != nil { - log.WithError(rollbackErr).Error("could not rollback transaction") - } - panic(txErr) - } - } - - error := tx.Commit() - if error != nil { - txErr := fmt.Errorf("failed to create saga tables.\n%v", error) - panic(txErr) - } - -} - //NewSagaStore creates a bew SagaStore func NewSagaStore(svcName string, txProvider gbus.TxProvider) saga.Store { @@ -103,7 +20,6 @@ func NewSagaStore(svcName string, txProvider gbus.TxProvider) saga.Store { ParamsMarkers: getParamsMarker()} store := &SagaStore{ base} - store.ensureSchema() return store } diff --git a/gbus/tx/mysql/timeout.go b/gbus/tx/mysql/timeout.go index ae8ca33..c72a73b 100644 --- a/gbus/tx/mysql/timeout.go +++ b/gbus/tx/mysql/timeout.go @@ -24,31 +24,6 @@ type TimeoutManager struct { exit chan bool } -func (tm *TimeoutManager) ensureSchema() error { - tblName := tm.timeoutsTableName - tx, e := tm.Txp.New() - if e != nil { - tm.Log().WithError(e).Error("failed to create schema for mysql timeout manager") - return e - } - - createTableSQL := `CREATE TABLE IF NOT EXISTS ` + tblName + ` ( - rec_id INT PRIMARY KEY AUTO_INCREMENT, - saga_id VARCHAR(255) UNIQUE NOT NULL, - timeout DATETIME NOT NULL, - INDEX (timeout), - INDEX (saga_id) - )` - - if _, e := tx.Exec(createTableSQL); e != nil { - if rbkErr := tx.Rollback(); rbkErr != nil { - tm.Log().Warn("timeout manager failed to rollback transaction") - } - return e - } - return tx.Commit() -} - func (tm *TimeoutManager) purge() error { purgeSQL := `DELETE FROM ` + tm.timeoutsTableName @@ -197,7 +172,7 @@ func (tm *TimeoutManager) SetTimeoutFunction(timeoutFunc func(tx *sql.Tx, sagaID } //GetTimeoutsTableName returns the table name in which to store timeouts -func getTimeoutsTableName(svcName string) string { +func GetTimeoutsTableName(svcName string) string { var re = regexp.MustCompile(`-|;|\\|`) sanitized := re.ReplaceAllString(svcName, "") @@ -208,7 +183,7 @@ func getTimeoutsTableName(svcName string) string { //NewTimeoutManager creates a new instance of a mysql based TimeoutManager func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.FieldLogger, svcName string, purge bool) *TimeoutManager { - timeoutsTableName := getTimeoutsTableName(svcName) + timeoutsTableName := GetTimeoutsTableName(svcName) tm := &TimeoutManager{ Log: logger, Bus: bus, @@ -216,10 +191,6 @@ func NewTimeoutManager(bus gbus.Bus, txp gbus.TxProvider, logger func() logrus.F SvcName: svcName, timeoutsTableName: timeoutsTableName, exit: make(chan bool)} - - if err := tm.ensureSchema(); err != nil { - panic(err) - } if purge { if err := tm.purge(); err != nil { panic(err) diff --git a/gbus/tx/mysql/txoutbox.go b/gbus/tx/mysql/txoutbox.go index 2081a36..21c8d79 100644 --- a/gbus/tx/mysql/txoutbox.go +++ b/gbus/tx/mysql/txoutbox.go @@ -5,15 +5,14 @@ import ( "database/sql" "encoding/gob" "fmt" - "strconv" - "strings" - "sync" - "time" - "github.com/rs/xid" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/wework/grabbit/gbus" + "strconv" + "strings" + "sync" + "time" ) var ( @@ -55,13 +54,6 @@ func (outbox *TxOutbox) Start(amqpOut *gbus.AMQPOutbox) error { if e != nil { panic(fmt.Sprintf("passed in transaction provider failed with the following error\n%s", e)) } - if ensureErr := outbox.ensureSchema(tx, outbox.svcName); ensureErr != nil { - err := tx.Rollback() - if err != nil { - outbox.log().WithError(err).Error("could not rollback the transaction for creation of schemas") - } - return ensureErr - } if outbox.purgeOnStartup { if purgeErr := outbox.purge(tx); purgeErr != nil { outbox.log().WithError(purgeErr).Error("failed to purge transactional outbox") @@ -321,75 +313,6 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows, return nil } -func (outbox *TxOutbox) ensureSchema(tx *sql.Tx, svcName string) error { - - schemaExists := outbox.outBoxTablesExists(tx, svcName) - - if schemaExists { - /* - The following performs an alter schema to accommodate for breaking change introduced in commit 6a9f5df - so that earlier consumers of grabbit will not break once the upgrade to the 1.0.0 release. - Once a proper DB migration stratagy will be in place and implemented (post 1.0.0) the following code - will be deleted. - */ - - return outbox.migrate0_9To1_0(tx, svcName) - } - - createTablesSQL := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` ( - rec_id int NOT NULL AUTO_INCREMENT, - message_id varchar(50) NOT NULL UNIQUE, - message_type varchar(50) NOT NULL, - exchange varchar(50) NOT NULL, - routing_key varchar(50) NOT NULL, - publishing longblob NOT NULL, - status int(11) NOT NULL, - relay_id varchar(50) NULL, - delivery_tag bigint(20) NOT NULL, - delivery_attempts int NOT NULL DEFAULT 0, - insert_date timestamp DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(rec_id), - INDEX status_delivery (rec_id, status, delivery_attempts))` - - _, createErr := tx.Exec(createTablesSQL) - - return createErr - -} - -func (outbox *TxOutbox) outBoxTablesExists(tx *sql.Tx, svcName string) bool { - - tblName := getOutboxName(svcName) - - selectSQL := `SELECT 1 FROM ` + tblName + ` LIMIT 1;` - - outbox.log().Info(selectSQL) - - row := tx.QueryRow(selectSQL) - var exists int - err := row.Scan(&exists) - if err != nil && err != sql.ErrNoRows { - return false - } - - return true -} - -func (outbox *TxOutbox) migrate0_9To1_0(tx *sql.Tx, svcName string) error { - tblName := getOutboxName(svcName) - alter := `ALTER TABLE ` + tblName + ` CHANGE COLUMN delivery_attemtps delivery_attempts int NOT NULL DEFAULT 0;` - _, execErr := tx.Exec(alter) - if execErr != nil { - outbox.log().WithField("sql_err", execErr).Info("migration:renaming column") - } - addIndex := `ALTER TABLE ` + tblName + ` ADD INDEX status_delivery (rec_id, status, delivery_attempts);` - _, indexErr := tx.Exec(addIndex) - if indexErr != nil { - outbox.log().WithField("sql_err", execErr).Info("migration:adding index column") - } - return nil -} - func getOutboxName(svcName string) string { return strings.ToLower("grabbit_" + sanitizeTableName(svcName) + "_outbox") diff --git a/gbus/tx/mysql/txprovider.go b/gbus/tx/mysql/txprovider.go index d075a91..8b29f4b 100644 --- a/gbus/tx/mysql/txprovider.go +++ b/gbus/tx/mysql/txprovider.go @@ -5,12 +5,11 @@ import ( "time" _ "github.com/go-sql-driver/mysql" //blank import - "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/tx" ) //NewTxProvider returns a new PgProvider -func NewTxProvider(connStr string) (gbus.TxProvider, error) { +func NewTxProvider(connStr string) (*tx.Provider, error) { db, err := sql.Open("mysql", connStr) db.SetConnMaxLifetime(time.Second) if err != nil { diff --git a/gbus/tx/sagastore.go b/gbus/tx/sagastore.go index 617b925..3c82916 100644 --- a/gbus/tx/sagastore.go +++ b/gbus/tx/sagastore.go @@ -62,7 +62,7 @@ func (store *SagaStore) scanInstances(rows *sql.Rows) ([]*saga.Instance, error) //GetSagasByType implements interface method store.GetSagasByType func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (instances []*saga.Instance, err error) { - tblName := store.GetSagatableName() + tblName := GetSagatableName(store.SvcName) selectSQL := "SELECT saga_id, saga_type, saga_data, version FROM " + tblName + " WHERE saga_type=" + store.ParamsMarkers[0] rows, err := tx.Query(selectSQL, sagaType.String()) @@ -86,7 +86,7 @@ func (store *SagaStore) GetSagasByType(tx *sql.Tx, sagaType reflect.Type) (insta //UpdateSaga implements interface method store.UpdateSaga func (store *SagaStore) UpdateSaga(tx *sql.Tx, instance *saga.Instance) (err error) { - tblName := store.GetSagatableName() + tblName := GetSagatableName(store.SvcName) currentVersion := instance.ConcurrencyCtrl nextVersion := instance.ConcurrencyCtrl + 1 instance.ConcurrencyCtrl = nextVersion @@ -115,7 +115,7 @@ func (store *SagaStore) RegisterSagaType(saga gbus.Saga) { //DeleteSaga implements interface method store.DeleteSaga func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error { - tblName := store.GetSagatableName() + tblName := GetSagatableName(store.SvcName) deleteSQL := `DELETE FROM ` + tblName + ` WHERE saga_id= ?` result, err := tx.Exec(deleteSQL, instance.ID) if err != nil { @@ -132,7 +132,7 @@ func (store *SagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error { //GetSagaByID implements interface method store.GetSagaByID func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) { - tblName := store.GetSagatableName() + tblName := GetSagatableName(store.SvcName) selectSQL := `SELECT saga_id, saga_type, saga_data, version FROM ` + tblName + ` WHERE saga_id=` + store.ParamsMarkers[0] + `` rows, err := tx.Query(selectSQL, sagaID) @@ -148,7 +148,7 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, } if err != nil { store.log().WithError(err). - WithFields(log.Fields{"saga_id": sagaID, "table_name": store.GetSagatableName()}). + WithFields(log.Fields{"saga_id": sagaID, "table_name": GetSagatableName(store.SvcName)}). Error("Failed to fetch saga") return nil, err @@ -166,7 +166,7 @@ func (store *SagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, //SaveNewSaga implements interface method store.SaveNewSaga func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) (err error) { store.RegisterSagaType(newInstance.UnderlyingInstance) - tblName := store.GetSagatableName() + tblName := GetSagatableName(store.SvcName) insertSQL := `INSERT INTO ` + tblName + ` (saga_id, saga_type, saga_data, version) VALUES (?, ?, ?, ?)` var buf []byte @@ -185,8 +185,8 @@ func (store *SagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstan //Purge cleans up the saga store, to be used in tests and in extreme situations in production func (store *SagaStore) Purge() error { tx := store.NewTx() - store.log().WithField("saga_table", store.GetSagatableName()).Info("Purging saga table") - deleteSQL := fmt.Sprintf("DELETE FROM %s", store.GetSagatableName()) + store.log().WithField("saga_table", GetSagatableName(store.SvcName)).Info("Purging saga table") + deleteSQL := fmt.Sprintf("DELETE FROM %s", GetSagatableName(store.SvcName)) results, err := tx.Exec(deleteSQL) if err != nil { store.log().WithError(err).Error("failed to purge saga table") @@ -224,10 +224,10 @@ func (store *SagaStore) NewTx() *sql.Tx { } //GetSagatableName returns the table name in which to store the Sagas -func (store *SagaStore) GetSagatableName() string { +func GetSagatableName(svcName string) string { var re = regexp.MustCompile(`-|;|\\|`) - sanitized := re.ReplaceAllString(store.SvcName, "") + sanitized := re.ReplaceAllString(svcName, "") return strings.ToLower("grabbit_" + sanitized + "_sagas") } diff --git a/gbus/worker.go b/gbus/worker.go index 1a16a02..aa35fb9 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -66,7 +66,7 @@ func (worker *worker) Start() error { worker.rpcMessages = rpcmsgs go worker.consumeMessages() - + go worker.consumeRPC() return nil } @@ -93,43 +93,22 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (< func (worker *worker) consumeMessages() { - //TODO:Handle panics due to tx errors so the consumption of messages will continue - for { - - var isRPCreply bool - var delivery amqp.Delivery - var shouldProceed bool - - select { - - case <-worker.stop: - worker.log().Info("stopped consuming messages") - return - case msgDelivery, ok := <-worker.messages: - if ok { - shouldProceed = true - } - delivery = msgDelivery - isRPCreply = false - case rpcDelivery, ok := <-worker.rpcMessages: - if ok { - shouldProceed = true - } - delivery = rpcDelivery - isRPCreply = true + for msg := range worker.messages { + if msg.Body == nil || len(msg.Body) == 0 { + continue } + worker.processMessage(msg, false) + } +} - /* - as the bus shuts down and amqp connection is killed the messages channel (b.msgs) gets closed - and delivery is a zero value so in order not to panic down the road we return if bus is shutdown - */ - if shouldProceed { +func (worker *worker) consumeRPC() { - worker.processMessage(delivery, isRPCreply) + for msg := range worker.rpcMessages { + if msg.Body == nil || len(msg.Body) == 0 { + continue } - + worker.processMessage(msg, true) } - } func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, error) { @@ -143,7 +122,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er } if bm.PayloadFQN == "" || bm.Semantics == "" { //TODO: Log poison pill message - worker.log().WithFields(logrus.Fields{"fqn": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") + worker.log().WithFields(logrus.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Warn("message received but no headers found...rejecting message") return nil, errors.New("missing critical headers") } @@ -151,7 +130,7 @@ func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, er var decErr error bm.Payload, decErr = worker.serializer.Decode(delivery.Body, bm.PayloadFQN) if decErr != nil { - worker.log().WithError(decErr).WithField("message", delivery).Error("failed to decode message. rejected as poison") + worker.log().WithError(decErr).WithField("message_name", bm.PayloadFQN).Error("failed to decode message. rejected as poison") return nil, decErr } return bm, nil @@ -337,13 +316,13 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan // each retry should run a new and separate transaction which should end with a commit or rollback action := func(attempt uint) (actionErr error) { - + tx, txCreateErr := worker.txProvider.New() - if txCreateErr != nil { - worker.log().WithError(txCreateErr).Error("failed creating new tx") - worker.span.LogFields(slog.Error(txCreateErr)) - return txCreateErr - } + if txCreateErr != nil { + worker.log().WithError(txCreateErr).Error("failed creating new tx") + worker.span.LogFields(slog.Error(txCreateErr)) + return txCreateErr + } worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) @@ -353,9 +332,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") - } + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") + } worker.span.LogFields(slog.Error(actionErr)) } worker.span.Finish() @@ -392,17 +371,17 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) rbkErr := tx.Rollback() - if rbkErr != nil { - worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") - } + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + } hspan.Finish() return handlerErr } cmtErr := tx.Commit() - if cmtErr != nil { - worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") - return cmtErr - } + if cmtErr != nil { + worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") + return cmtErr + } return nil } diff --git a/go.mod b/go.mod index ddd8cf2..ca69856 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/kr/pretty v0.1.0 // indirect github.com/kr/pty v1.1.8 // indirect github.com/linkedin/goavro v2.1.0+incompatible + github.com/lopezator/migrator v0.2.0 github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 diff --git a/go.sum b/go.sum index a8e86ca..fd5057a 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= +github.com/lopezator/migrator v0.2.0 h1:5t2GE77ojbyl9fZ4lHxkfFjwNZvTCzWFMDSorQq5O/c= +github.com/lopezator/migrator v0.2.0/go.mod h1:bpVAVPkWSvTw8ya2Pk7E/KiNAyDWNImgivQY79o8/8I= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -171,6 +174,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190712213246-8b927904ee0d/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.0 h1:Tfd7cKwKbFRsI8RMAD3oqqw7JPFRrvFlOsfbgVkjOOw= google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= diff --git a/tests/metrics_test.go b/tests/metrics_test.go index dcfcc75..43980bb 100644 --- a/tests/metrics_test.go +++ b/tests/metrics_test.go @@ -8,7 +8,7 @@ import ( ) var ( - logger logrus.FieldLogger + logger logrus.FieldLogger runningTries = 5 )