Skip to content

V1.x master merge conflicts #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
panic(err)
}
gb.TxProvider = mysqltx

mysql.EnsureSchema(mysqltx.Database, gb.SvcName)

//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
Expand Down
16 changes: 4 additions & 12 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,6 @@ func (b *DefaultBus) bindServiceQueue() error {
return nil
}

func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) {
channel, e := conn.Channel()
if e != nil {
return nil, e
}
return channel, nil
}

//Start implements GBus.Start()
func (b *DefaultBus) Start() error {

Expand All @@ -188,10 +180,10 @@ func (b *DefaultBus) Start() error {
return e
}

if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
if b.ingressChannel, e = b.ingressConn.Channel(); e != nil {
return e
}
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
if b.egressChannel, e = b.egressConn.Channel(); e != nil {
return e
}

Expand All @@ -209,7 +201,7 @@ func (b *DefaultBus) Start() error {
TODO://the design is crap and needs to be refactored
*/
var amqpChan *amqp.Channel
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
if amqpChan, e = b.egressConn.Channel(); e != nil {
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
return e
}
Expand Down Expand Up @@ -272,7 +264,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
workers := make([]*worker, 0)
for i := uint(0); i < workerNum; i++ {
//create a channel per worker as we can't share channels across go routines
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
amqpChan, createChanErr := b.ingressConn.Channel()
if createChanErr != nil {
return nil, createChanErr
}
Expand Down
9 changes: 9 additions & 0 deletions gbus/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID})
}

//Reply implements the Invocation.Reply signature
func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *BusMessage) error {
if dfi.inboundMsg != nil {
replyMessage.CorrelationID = dfi.inboundMsg.ID
Expand All @@ -51,13 +52,15 @@ func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *Bu
return err
}

//Send implements the Invocation.Send signature
func (dfi *defaultInvocationContext) Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error {
if dfi.tx != nil {
return dfi.bus.sendWithTx(ctx, dfi.tx, toService, command, policies...)
}
return dfi.bus.Send(ctx, toService, command, policies...)
}

//Publish implements the Invocation.Publish signature
func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error {

if dfi.tx != nil {
Expand All @@ -66,26 +69,32 @@ func (dfi *defaultInvocationContext) Publish(ctx context.Context, exchange, topi
return dfi.bus.Publish(ctx, exchange, topic, event, policies...)
}

//RPC implements the Invocation.RPC signature
func (dfi *defaultInvocationContext) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error) {
return dfi.bus.RPC(ctx, service, request, reply, timeout)
}

//Bus implements the Invocation.Bus signature
func (dfi *defaultInvocationContext) Bus() Messaging {
return dfi
}

//Tx implements the Invocation.Tx signature
func (dfi *defaultInvocationContext) Tx() *sql.Tx {
return dfi.tx
}

//Ctx implements the Invocation.Ctx signature
func (dfi *defaultInvocationContext) Ctx() context.Context {
return dfi.ctx
}

//Routing implements the Invocation.Routing signature
func (dfi *defaultInvocationContext) Routing() (exchange, routingKey string) {
return dfi.exchange, dfi.routingKey
}

//DeliveryInfo implements the Invocation.DeliveryInfo signature
func (dfi *defaultInvocationContext) DeliveryInfo() DeliveryInfo {
return dfi.deliveryInfo
}
1 change: 1 addition & 0 deletions gbus/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
//MessageHandler signature for all command handlers
type MessageHandler func(invocation Invocation, message *BusMessage) error

//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
func (mg MessageHandler) Name() string {
funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name()
splits := strings.Split(funName, ".")
Expand Down
23 changes: 12 additions & 11 deletions gbus/metrics/handler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package metrics

import (
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_model/go"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/sirupsen/logrus"
"sync"
)

var (
Expand All @@ -20,7 +21,7 @@ const (
grabbitPrefix = "grabbit"
)

type HandlerMetrics struct {
type handlerMetrics struct {
result *prometheus.CounterVec
latency prometheus.Summary
}
Expand Down Expand Up @@ -62,17 +63,17 @@ func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger
return err
}

func GetHandlerMetrics(handlerName string) *HandlerMetrics {
func GetHandlerMetrics(handlerName string) *handlerMetrics {
entry, ok := handlerMetricsByHandlerName.Load(handlerName)
if ok {
return entry.(*HandlerMetrics)
return entry.(*handlerMetrics)
}

return nil
}

func newHandlerMetrics(handlerName string) *HandlerMetrics {
return &HandlerMetrics{
func newHandlerMetrics(handlerName string) *handlerMetrics {
return &handlerMetrics{
result: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: grabbitPrefix,
Expand All @@ -98,15 +99,15 @@ func trackTime(functionToTrack func() error, observer prometheus.Observer) error
return functionToTrack()
}

func (hm *HandlerMetrics) GetSuccessCount() (float64, error) {
func (hm *handlerMetrics) GetSuccessCount() (float64, error) {
return hm.getLabeledCounterValue(success)
}

func (hm *HandlerMetrics) GetFailureCount() (float64, error) {
func (hm *handlerMetrics) GetFailureCount() (float64, error) {
return hm.getLabeledCounterValue(failure)
}

func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
func (hm *handlerMetrics) GetLatencySampleCount() (*uint64, error) {
m := &io_prometheus_client.Metric{}
err := hm.latency.Write(m)
if err != nil {
Expand All @@ -116,7 +117,7 @@ func (hm *HandlerMetrics) GetLatencySampleCount() (*uint64, error) {
return m.GetSummary().SampleCount, nil
}

func (hm *HandlerMetrics) getLabeledCounterValue(label string) (float64, error) {
func (hm *handlerMetrics) getLabeledCounterValue(label string) (float64, error) {
m := &io_prometheus_client.Metric{}
err := hm.result.WithLabelValues(label).Write(m)

Expand Down
2 changes: 1 addition & 1 deletion gbus/metrics/message_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ func newRejectedMessagesCounter() prometheus.Counter {
Name: "rejected_messages",
Help: "counting the rejected messages",
})
}
}
2 changes: 2 additions & 0 deletions gbus/metrics/saga_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go"
)

//SagaTimeoutCounter is the prometheus counter counting timed out saga instances
var SagaTimeoutCounter = newSagaTimeoutCounter()

//GetSagaTimeoutCounterValue gets the counter value of timed out sagas reported to prometheus
func GetSagaTimeoutCounterValue() (float64, error) {
m := &io_prometheus_client.Metric{}
err := SagaTimeoutCounter.Write(m)
Expand Down
2 changes: 1 addition & 1 deletion gbus/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro
return nil
}

//Shutdown stops the outbox
func (out *AMQPOutbox) Shutdown() {
close(out.stop)

}

//Post implements Outbox.Send
Expand Down
2 changes: 1 addition & 1 deletion gbus/serialization/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (as *Avro) Encode(obj gbus.Message) (msg []byte, err error) {
tobj, ok := obj.(AvroMessageGenerated)
if !ok {
err := fmt.Errorf("could not convert obj to AvroMessageGenerated")
logrus.WithError(err).WithField("obj", obj).Error("could not convert object")
logrus.WithError(err).Error("could not convert object")
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion gbus/serialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (as *Proto) Decode(buffer []byte, schemaName string) (msg gbus.Message, err
msg, ok = tmsg.(gbus.Message)
if !ok {
err = fmt.Errorf("could not cast obj to gbus.Message")
as.logger.WithError(err).WithField("msg", tmsg).Errorf("could not cast %v to gbus.Message", tmsg)
as.logger.WithError(err).Errorf("could not cast %v to gbus.Message", tmsg)
return nil, err
}

Expand Down
103 changes: 103 additions & 0 deletions gbus/tx/mysql/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package mysql

import (
"database/sql"

"fmt"
"github.com/lopezator/migrator"
"github.com/wework/grabbit/gbus/tx"
)

//SagaStoreTableMigration creates the service saga store table
func SagaStoreTableMigration(svcName string) *migrator.Migration {
tblName := tx.GetSagatableName(svcName)

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id INT PRIMARY KEY AUTO_INCREMENT,
saga_id VARCHAR(255) UNIQUE NOT NULL,
saga_type VARCHAR(255) NOT NULL,
saga_data LONGBLOB NOT NULL,
version integer NOT NULL DEFAULT 0,
last_update timestamp DEFAULT NOW(),
INDEX ` + tblName + `_sagatype_idx (saga_type))`

return &migrator.Migration{
Name: "create saga store table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(createTableQuery); err != nil {
return err
}
return nil
},
}
}

//OutboxMigrations creates service outbox table
func OutboxMigrations(svcName string) *migrator.Migration {

query := `CREATE TABLE IF NOT EXISTS ` + getOutboxName(svcName) + ` (
rec_id int NOT NULL AUTO_INCREMENT,
message_id varchar(50) NOT NULL UNIQUE,
message_type varchar(50) NOT NULL,
exchange varchar(50) NOT NULL,
routing_key varchar(50) NOT NULL,
publishing longblob NOT NULL,
status int(11) NOT NULL,
relay_id varchar(50) NULL,
delivery_tag bigint(20) NOT NULL,
delivery_attempts int NOT NULL DEFAULT 0,
insert_date timestamp DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(rec_id),
INDEX status_delivery (rec_id, status, delivery_attempts))`

return &migrator.Migration{
Name: "create outbox table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(query); err != nil {
return err
}
return nil
},
}
}

//TimoutTableMigration creates the service timeout table, where timeouts are persisted
func TimoutTableMigration(svcName string) *migrator.Migration {
tblName := GetTimeoutsTableName(svcName)

createTableQuery := `CREATE TABLE IF NOT EXISTS ` + tblName + ` (
rec_id INT PRIMARY KEY AUTO_INCREMENT,
saga_id VARCHAR(255) UNIQUE NOT NULL,
timeout DATETIME NOT NULL,
INDEX (timeout),
INDEX (saga_id)
)`

return &migrator.Migration{
Name: "create timeout table",
Func: func(tx *sql.Tx) error {
if _, err := tx.Exec(createTableQuery); err != nil {
return err
}
return nil
},
}
}

//EnsureSchema implements Grabbit's migrations strategy
func EnsureSchema(db *sql.DB, svcName string) {
migrationsTable := fmt.Sprintf("grabbitMigrations_%s", svcName)

migrate, err := migrator.New(migrator.TableName(migrationsTable), migrator.Migrations(
OutboxMigrations(svcName),
SagaStoreTableMigration(svcName),
TimoutTableMigration(svcName),
))
if err != nil {
panic(err)
}
err = migrate.Migrate(db)
if err != nil {
panic(err)
}
}
Loading