Skip to content

Commit d227a86

Browse files
author
Guy Baron
authored
V1.1.1 rollup into master (#151)
1 parent 11d7367 commit d227a86

21 files changed

+676
-267
lines changed

README.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
2121
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
2222
6) Deadlettering
2323
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
24-
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25-
26-
Planned:
27-
28-
1) Deduplication of inbound messages
24+
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25+
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
2926

3027
## Stable release
3128
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

docs/TRACING.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Tracing
2+
3+
grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)).
4+
5+
NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go)
6+
7+
Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler.
8+
9+
```go
10+
11+
func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
12+
reply := gbus.NewBusMessage(MyReply{})
13+
cmd := gbus.NewBusMessage(MyCommand{})
14+
ctx := invocation.Ctx()
15+
16+
if err := invocation.Send(ctx, "another-service", cmd); err != nil{
17+
return err
18+
}
19+
if err := invocation.Reply(ctx, reply); err != nil{
20+
return err
21+
}
22+
return nil
23+
}
24+
25+
```

gbus/abstractions.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type BusConfiguration struct {
3030
type Bus interface {
3131
HandlerRegister
3232
Deadlettering
33+
RawMessageHandling
3334
BusSwitch
3435
Messaging
3536
SagaRegister
@@ -129,10 +130,37 @@ type Saga interface {
129130

130131
//Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
131132
type Deadlettering interface {
132-
HandleDeadletter(handler DeadLetterMessageHandler)
133+
/*
134+
HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
135+
This function will be removed in future grabbit releases
136+
*/
137+
HandleDeadletter(handler RawMessageHandler)
133138
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
134139
}
135140

141+
/*
142+
RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees
143+
that the bus provides
144+
*/
145+
type RawMessageHandling interface {
146+
/*
147+
SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
148+
to the service queue.
149+
The handler will get called with a scoped transaction that is a different transaction than the ones that
150+
regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
151+
can not be serialized by the bus to one of the registered schemas
152+
153+
In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
154+
and afterward will call any registered message handlers.
155+
if the global raw handler returns an error the message gets rejected and any additional
156+
handlers will not be called.
157+
You should not use the global raw message handler to drive business logic as it breaks the local transactivity
158+
guarantees grabbit provides and should only be used in specialized cases.
159+
If you do decide to use this feature try not shooting yourself in the foot.
160+
*/
161+
SetGlobalRawMessageHandler(handler RawMessageHandler)
162+
}
163+
136164
//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
137165
type RequestSagaTimeout interface {
138166
TimeoutDuration() time.Duration

gbus/bus.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type DefaultBus struct {
4343
amqpOutbox *AMQPOutbox
4444

4545
RPCHandlers map[string]MessageHandler
46-
deadletterHandler DeadLetterMessageHandler
46+
deadletterHandler RawMessageHandler
47+
globalRawHandler RawMessageHandler
4748
HandlersLock *sync.Mutex
4849
RPCLock *sync.Mutex
4950
SenderLock *sync.Mutex
@@ -73,8 +74,8 @@ var (
7374
//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
7475
//for a random retry time. Default is 10 but it is configurable.
7576
BaseRetryDuration = 10 * time.Millisecond
76-
//RpcHeaderName used to define the header in grabbit for RPC
77-
RpcHeaderName = "x-grabbit-msg-rpc-id"
77+
//RPCHeaderName used to define the header in grabbit for RPC
78+
RPCHeaderName = "x-grabbit-msg-rpc-id"
7879
)
7980

8081
func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
@@ -286,6 +287,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
286287
rpcLock: b.RPCLock,
287288
rpcHandlers: b.RPCHandlers,
288289
deadletterHandler: b.deadletterHandler,
290+
globalRawHandler: b.globalRawHandler,
289291
handlersLock: &sync.Mutex{},
290292
registrations: b.Registrations,
291293
serializer: b.Serializer,
@@ -547,11 +549,17 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler
547549
return b.registerHandlerImpl(exchange, topic, event, handler)
548550
}
549551

550-
//HandleDeadletter implements GBus.HandleDeadletter
551-
func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) {
552+
//HandleDeadletter implements Deadlettering.HandleDeadletter
553+
func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler) {
552554
b.registerDeadLetterHandler(handler)
553555
}
554556

557+
//HandleDeadletter implements RawMessageHandling.SetGlobalRawMessageHandler
558+
func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler) {
559+
metrics.AddHandlerMetrics(handler.Name())
560+
b.globalRawHandler = handler
561+
}
562+
555563
//ReturnDeadToQueue returns a message to its original destination
556564
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
557565
return b.returnDeadToQueue(ctx, nil, publishing)
@@ -623,7 +631,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.
623631
func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
624632
b.SenderLock.Lock()
625633
defer b.SenderLock.Unlock()
626-
span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl")
634+
span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage")
635+
627636
defer func() {
628637
if err := recover(); err != nil {
629638
errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack())
@@ -691,7 +700,7 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag
691700
return nil
692701
}
693702

694-
func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) {
703+
func (b *DefaultBus) registerDeadLetterHandler(handler RawMessageHandler) {
695704
metrics.AddHandlerMetrics(handler.Name())
696705
b.deadletterHandler = handler
697706
}
@@ -705,7 +714,7 @@ type rpcPolicy struct {
705714
}
706715

707716
func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
708-
publishing.Headers[RpcHeaderName] = p.rpcID
717+
publishing.Headers[RPCHeaderName] = p.rpcID
709718
}
710719

711720
//Log returns the default logrus.FieldLogger for the bus via the Glogged helper

gbus/message_handler.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,26 @@ package gbus
22

33
import (
44
"database/sql"
5-
"github.com/streadway/amqp"
65
"reflect"
76
"runtime"
87
"strings"
8+
9+
"github.com/streadway/amqp"
910
)
1011

1112
//MessageHandler signature for all command handlers
1213
type MessageHandler func(invocation Invocation, message *BusMessage) error
1314

14-
//DeadLetterMessageHandler signature for dead letter handler
15-
type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error
15+
//RawMessageHandler signature for handlers that handle raw amqp deliveries
16+
type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error
1617

1718
//Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type
1819
func (mg MessageHandler) Name() string {
1920
return nameFromFunc(mg)
2021
}
2122

2223
//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type
23-
func (dlmg DeadLetterMessageHandler) Name() string {
24+
func (dlmg RawMessageHandler) Name() string {
2425
return nameFromFunc(dlmg)
2526
}
2627

gbus/messages.go

+27-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package gbus
22

33
import (
4+
"errors"
5+
"fmt"
6+
47
"github.com/opentracing/opentracing-go/log"
58
"github.com/rs/xid"
69
"github.com/streadway/amqp"
@@ -27,11 +30,28 @@ func NewBusMessage(payload Message) *BusMessage {
2730
return bm
2831
}
2932

30-
//NewFromAMQPHeaders creates a BusMessage from headers of an amqp message
31-
func NewFromAMQPHeaders(headers amqp.Table) *BusMessage {
33+
//NewFromDelivery creates a BusMessage from an amqp delivery
34+
func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error) {
3235
bm := &BusMessage{}
33-
bm.SetFromAMQPHeaders(headers)
34-
return bm
36+
bm.SetFromAMQPHeaders(delivery)
37+
38+
bm.ID = delivery.MessageId
39+
bm.CorrelationID = delivery.CorrelationId
40+
if delivery.Exchange != "" {
41+
bm.Semantics = EVT
42+
} else {
43+
bm.Semantics = CMD
44+
}
45+
if bm.PayloadFQN == "" || bm.Semantics == "" {
46+
errMsg := fmt.Sprintf("missing critical headers. message_name:%s semantics: %s", bm.PayloadFQN, bm.Semantics)
47+
return nil, errors.New(errMsg)
48+
}
49+
return bm, nil
50+
}
51+
52+
//GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery
53+
func GetMessageName(delivery amqp.Delivery) string {
54+
return castToString(delivery.Headers["x-msg-name"])
3555
}
3656

3757
//GetAMQPHeaders convert to AMQP headers Table everything but a payload
@@ -46,12 +66,12 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
4666
}
4767

4868
//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
49-
func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table) {
50-
69+
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
70+
headers := delivery.Headers
5171
bm.SagaID = castToString(headers["x-msg-saga-id"])
5272
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
5373
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
54-
bm.PayloadFQN = castToString(headers["x-msg-name"])
74+
bm.PayloadFQN = GetMessageName(delivery)
5575

5676
}
5777

gbus/metrics/message_metrics.go

+8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ var (
1010
rejectedMessages = newRejectedMessagesCounter()
1111
)
1212

13+
//ResetRejectedMessagesCounter resets the counter intended to be used in tests only
14+
func ResetRejectedMessagesCounter() {
15+
16+
prometheus.Unregister(rejectedMessages)
17+
rejectedMessages = newRejectedMessagesCounter()
18+
}
19+
1320
//ReportRejectedMessage reports a message being rejected to the metrics counter
1421
func ReportRejectedMessage() {
1522
rejectedMessages.Inc()
@@ -28,6 +35,7 @@ func GetRejectedMessagesValue() (float64, error) {
2835
}
2936

3037
func newRejectedMessagesCounter() prometheus.Counter {
38+
3139
return promauto.NewCounter(prometheus.CounterOpts{
3240
Namespace: grabbitPrefix,
3341
Subsystem: "messages",

gbus/saga/glue.go

+26-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package saga
22

33
import (
4+
"context"
45
"database/sql"
56
"errors"
67
"fmt"
78
"reflect"
89
"strings"
910
"sync"
1011

12+
"github.com/opentracing/opentracing-go"
13+
slog "github.com/opentracing/opentracing-go/log"
1114
"github.com/sirupsen/logrus"
1215
"github.com/wework/grabbit/gbus"
1316
"github.com/wework/grabbit/gbus/metrics"
@@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def {
98101
return defs
99102
}
100103

101-
func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error {
104+
//SagaHandler is the generic handler invoking saga instances
105+
func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error {
102106

103107
imsm.lock.Lock()
104108
defer imsm.lock.Unlock()
@@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
117121
*/
118122
startNew := def.shouldStartNewSaga(message)
119123
if startNew {
124+
120125
newInstance := def.newInstance()
121126
imsm.Log().
122127
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
123128
Info("created new saga")
124-
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
129+
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
125130
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
126131
return invkErr
127132
}
@@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
154159
return e
155160
}
156161
def.configureSaga(instance)
157-
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
162+
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
158163
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
159164
return invkErr
160165
}
@@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
176181

177182
for _, instance := range instances {
178183
def.configureSaga(instance)
179-
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
184+
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
180185
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
181186
return invkErr
182187
}
@@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
191196
return nil
192197
}
193198

194-
func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
199+
func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
200+
201+
span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String())
202+
defer span.Finish()
195203
sginv := &sagaInvocation{
196204
decoratedBus: invocation.Bus(),
197205
decoratedInvocation: invocation,
198206
inboundMsg: message,
199207
sagaID: instance.ID,
200-
ctx: invocation.Ctx(),
208+
ctx: sctx,
201209
invokingService: imsm.svcName,
202210
}
203211
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
@@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
207215
}))
208216

209217
exchange, routingKey := invocation.Routing()
210-
return instance.invoke(exchange, routingKey, sginv, message)
218+
err := instance.invoke(exchange, routingKey, sginv, message)
219+
if err != nil {
220+
span.LogFields(slog.Error(err))
221+
}
222+
return err
211223
}
212224

213225
func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {
@@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error {
232244
return nil
233245
}
234246
imsm.alreadyRegistred[message.SchemaName()] = true
235-
return imsm.bus.HandleMessage(message, imsm.handler)
247+
return imsm.bus.HandleMessage(message, imsm.SagaHandler)
236248
}
237249

238250
func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error {
@@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
241253
return nil
242254
}
243255
imsm.alreadyRegistred[event.SchemaName()] = true
244-
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
256+
return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler)
245257
}
246258

247259
//TimeoutSaga fetches a saga instance and calls its timeout interface
@@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {
257269
if err != nil {
258270
return err
259271
}
272+
273+
span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout")
274+
span.SetTag("saga_type", saga.String())
275+
defer span.Finish()
260276
timeoutErr := saga.timeout(tx, imsm.bus)
277+
261278
if timeoutErr != nil {
262279
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
263280
return timeoutErr

0 commit comments

Comments
 (0)