Skip to content

Commit 50aecfb

Browse files
author
Guy Baron
authored
v1.1.6 rollup (#218)
* inceased outbox VARCHAR column length to 2048 (#155) * added reply to initiator functionality to sagas (#157) * added generic handler metrics with message type as the label (#144) * added generic handler metrics with message type as the label * add handler name label to the metrics * adding new metrics to the read me * Fix handle empty body (#156) * set the correct Type and Content-Type headers on out going messages (#160) * set the correct Type and Content-Type headers on out going messages * refactoring * fixing ReplyToInitiator not working when initiator sends a message via the RPC interface (#163) * fixing ReplyToInitiator not working when initiator sends a message via the RPC interface * Improved wording of saga documentation article (#164) * better wording for documentation * added golangcli lint configuration and fixed linting failures (#165) * fixed logging issues (#167) * allow getting the saga id of the current invoked saga (#168) * setting the target saga id on the saga correlation id field (#171) * support emperror (#174) * setting the target saga id on the saga correlation id field * added emperror support * Fix logging and added logging documentation (#176) * fixed logging issues and added documentation logging via the invocation interface was broken and did not add contextual data related to the invocation due to a bug in the way the Glogged structure is currently implemented. Also added documentation on how logging should be done within a handler including adding context to returned errors so that data gets logged * added missing documentation file * added documentation on serialization support (#177) * fixed emperror url format * added serialization documentation * added documentation for messaging patterns, retries and transactional processing (#181) * fixed emperror url format * added serialization documentation * added documentation for message semantics, retries and transactions * Fix docmentation (#182) * fixed emperror url format * added serialization documentation * added documentation for message semantics, retries and transactions * fixing tx documentation page * Added sample application (#184) * Update README.md * added ability to configure outbox (#186) * fixing issues with invoking the GlobalrawMessageHandler (#189) Fixing the following issues: #187 #188 * Saga bug fixes (#198) * fixing the way that the target service was resolved closes #195 #195 * Fixing the value of the StartedBy field when creating a new saga instance closes #194 #194 * rolling back checking if replying to an event to maintain backward compatibility * logging a Warn instead of rejecting the message when saga not found in store closes #196 #196 * fixing minor tech debt issues (#199) * v1.1.5 rollup to master (#185) * fix(bug:200) logs are now being reported correctly Fixes issue #200 Also updated go.mod for newer versions of dependancies * added metrics for transactional outbox (#193) * added metrics for transactional outbox The follwoing metrics were added outbox_total_records: reports the total amount of records currently in the outbox outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox * reading status and count fields in the correct order from rows * service name now gets added to log entries when a custom logger is set or in saga store (#204) * service name now gets added to log entries when a custom logger is set #200 * fixing issue that the saga store was not adding the service to its log entires #206 * Fix logging (#207) * service name now gets added to log entries when a custom logger is set #200 * fixing issue that the saga store was not adding the service to its log entires #206 * fixing minor issue with logging saga store initialization * Add support for setting the idempotency key on a BusMessage (#208) * Added x-idempotency-key header and the ability for client code to set it #106 * Set the value of BusMessage.ID as the default value of BusMessage.IdempotencyKey * fixing issue with txoutbox failing to deliver a message after 50 (#205) failed attempts #203 * Allow correctly replaying Events (#190) * refactored returnDeadToQueue to allow correctly returning Events as well as Messages(commands) * fixed lint issues * code review fixes * more review comments * minor change to force coveralls to rebuild * returnDeadToQueue - changed routing-key to always be that routing-key of the first death, not the latest death, also refactored naming and added comment for clarity * moved getRoutingParamsFromDelivery to worker.go * fixed type in migration name attribute (#213) * adding all logging context data to worker and saga log entries (#215) * adding all logging context data to worker and saga log entries * added logging with context when command or reply received for saga def but no saga correlation id found * removing minor discrepancies and updating documentation
1 parent eb32401 commit 50aecfb

25 files changed

+643
-285
lines changed

docs/LOGGING.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ annotated with the following contextual data (added as logrus fields to the log
99
allowing for a better debugging experience.
1010

1111
- _service: the service name
12+
- correlation_id: the correlation id set for the message
13+
- exchange: the exchange the message was published to
1214
- handler_name: the name of the handler being invoked
15+
- idempotency_key: the idempotency key set for the message
1316
- message_id: the id of the processed message
1417
- message_name: the type of the message that is being processed
1518
- routing_key: the routing_key of the message
1619
- saga_id: the id of the saga instance being invoked
17-
- saga_def: the type of the saga that is being invoked
20+
- saga_def: the type of the saga that is being invoked
21+
- worker: the worker identifier that is processing the message
1822

1923
```go
2024

docs/METRICS.md

+4
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,7 @@ grabbit exposes and reports the following metrics to Prometheus
1010
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
1111
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
1212
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
13+
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
14+
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
15+
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |
16+

docs/SAGA.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (s *BookVacationSaga) HandleBookVacationCommand(invocation gbus.Invocation,
109109
reply := gbus.NewBusMessage(BookVacationReply{
110110
BookingId: s.BookingId})
111111
//reply to the command so the caller can continue with his execution flow
112-
return invocation.Reply(noopTraceContext(), reply)
112+
return invocation.Reply(context.Background(), reply)
113113
}
114114
```
115115

examples/vacation_app/cmd/client.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/wework/grabbit/gbus"
1515
)
1616

17-
1817
var runClientCmd = &cobra.Command{
1918
Use: "client",
2019
Short: "Run the client app",
@@ -57,8 +56,7 @@ var runClientCmd = &cobra.Command{
5756
func HandleBookingComplete(invocation gbus.Invocation, message *gbus.BusMessage) error {
5857
bookingComplete := message.Payload.(*messages.BookingComplete)
5958
if bookingComplete.Success {
60-
fmt.Printf("booking completed succesfully\n")
61-
59+
fmt.Printf("booking completed successfully\n")
6260
} else {
6361
fmt.Printf("failed to book vacation\n")
6462
}

examples/vacation_app/cmd/flights.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@ import (
44
"bufio"
55
"fmt"
66
"os"
7-
"vacation_app/trace"
7+
88
"vacation_app/messages"
9-
9+
"vacation_app/trace"
1010
log "github.com/sirupsen/logrus"
1111
"github.com/spf13/cobra"
1212
"github.com/wework/grabbit/gbus"
1313
)
1414

1515

16-
1716
var runFlightsgServiceCmd = &cobra.Command{
1817
Use: "flights",
1918
Short: "Run the flights service",
@@ -30,7 +29,8 @@ var runFlightsgServiceCmd = &cobra.Command{
3029
gb := createBus(svcName)
3130

3231
gb.HandleMessage(messages.BookFlightsCmd{}, HandleBookFlightCommand)
33-
gb.HandleMessage(messages.CancelFlightsCmd{}, HandleCancelFlightCommand)
32+
33+
gb.HandleMessage(messages.CancelFlightsCmd{}, HandleCancelFlightCommand)
3434

3535
gb.Start()
3636
defer gb.Shutdown()

gbus/abstractions.go

-6
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ const (
2020
EVT Semantics = "evt"
2121
)
2222

23-
//BusConfiguration provides configuration passed to the bus builder
24-
type BusConfiguration struct {
25-
MaxRetryCount uint
26-
BaseRetryDuration int
27-
}
28-
2923
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
3024
type Bus interface {
3125
HandlerRegister

gbus/builder/builder.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ type defaultBuilder struct {
2929
dbPingTimeout time.Duration
3030
usingPingTimeout bool
3131
logger logrus.FieldLogger
32+
busCfg gbus.BusConfiguration
3233
}
3334

3435
func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
3536

3637
gb := &gbus.DefaultBus{
3738
AmqpConnStr: builder.connStr,
3839
PrefetchCount: builder.PrefetchCount,
40+
Glogged: &gbus.Glogged{},
3941

4042
SvcName: svcName,
4143
PurgeOnStartup: builder.purgeOnStartup,
@@ -53,11 +55,13 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
5355
Confirm: builder.confirm,
5456
}
5557

58+
var finalLogger logrus.FieldLogger
5659
if builder.logger != nil {
57-
gb.SetLogger(builder.logger)
60+
finalLogger = builder.logger.WithField("_service", gb.SvcName)
5861
} else {
59-
gb.SetLogger(logrus.New())
62+
finalLogger = logrus.WithField("_service", gb.SvcName)
6063
}
64+
gb.SetLogger(finalLogger)
6165

6266
if builder.workerNum < 1 {
6367
gb.WorkerNum = 1
@@ -72,6 +76,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
7276
switch builder.txnlProvider {
7377

7478
case "mysql":
79+
providerLogger := gb.Log().WithField("provider", "mysql")
7580
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
7681
if err != nil {
7782
panic(err)
@@ -82,14 +87,15 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
8287

8388
//TODO move purge logic into the NewSagaStore factory method
8489
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
90+
sagaStore.SetLogger(providerLogger)
8591
if builder.purgeOnStartup {
8692
err := sagaStore.Purge()
8793
if err != nil {
8894
panic(err)
8995
}
9096
}
91-
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
92-
gb.Outbox.SetLogger(gb.Log())
97+
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup, builder.busCfg.OutboxCfg)
98+
gb.Outbox.SetLogger(providerLogger)
9399
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)
94100

95101
default:
@@ -109,6 +115,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
109115
}
110116
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
111117
glue.SetLogger(gb.Log())
118+
sagaStore.SetLogger(glue.Log())
112119
gb.Glue = glue
113120
return gb
114121
}
@@ -182,6 +189,7 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
182189

183190
func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder {
184191

192+
builder.busCfg = config
185193
gbus.MaxRetryCount = config.MaxRetryCount
186194

187195
if config.BaseRetryDuration > 0 {
@@ -207,6 +215,7 @@ type Nu struct {
207215
//Bus inits a new BusBuilder
208216
func (Nu) Bus(brokerConnStr string) gbus.Builder {
209217
return &defaultBuilder{
218+
busCfg: gbus.BusConfiguration{},
210219
PrefetchCount: 1,
211220
connStr: brokerConnStr,
212221
serializer: serialization.NewGobSerializer(),

gbus/bus.go

+56-17
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ var (
7575
//for a random retry time. Default is 10 but it is configurable.
7676
BaseRetryDuration = 10 * time.Millisecond
7777
//RPCHeaderName used to define the header in grabbit for RPC
78-
RPCHeaderName = "x-grabbit-msg-rpc-id"
78+
RPCHeaderName = "x-grabbit-msg-rpc-id"
79+
ResurrectedHeaderName = "x-resurrected-from-death"
80+
FirstDeathRoutingKeyHeaderName = "x-first-death-routing-key"
7981
)
8082

8183
func (b *DefaultBus) createRPCQueue() (amqp.Queue, error) {
@@ -488,23 +490,70 @@ func (b *DefaultBus) sendWithTx(ctx context.Context, ambientTx *sql.Tx, toServic
488490

489491
func (b *DefaultBus) returnDeadToQueue(ctx context.Context, ambientTx *sql.Tx, publishing *amqp.Publishing) error {
490492
if !b.started {
491-
return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages")
493+
return errors.New("bus not started or already shutdown, make sure you call bus.Start() before sending messages")
494+
}
495+
496+
targetQueue, ok := publishing.Headers["x-first-death-queue"].(string)
497+
if !ok {
498+
return fmt.Errorf("bad x-first-death-queue field - %v", publishing.Headers["x-first-death-queue"])
499+
}
500+
exchange, ok := publishing.Headers["x-first-death-exchange"].(string)
501+
if !ok {
502+
return fmt.Errorf("bad x-first-death-exchange field - %v", publishing.Headers["x-first-death-exchange"])
503+
}
504+
routingKey, err := extractFirstDeathRoutingKey(publishing.Headers)
505+
if err != nil {
506+
return err
492507
}
493-
//publishing.Headers.
494-
exchange := fmt.Sprintf("%v", publishing.Headers["x-first-death-exchange"])
495-
routingKey := fmt.Sprintf("%v", publishing.Headers["x-first-death-queue"])
508+
509+
publishing.Headers[FirstDeathRoutingKeyHeaderName] = routingKey // Set the original death routing key to be used later for replaying
510+
publishing.Headers[ResurrectedHeaderName] = true // mark message as resurrected
511+
// publishing.Headers["x-first-death-exchange"] is not deleted and kept as is
496512

497513
delete(publishing.Headers, "x-death")
498514
delete(publishing.Headers, "x-first-death-queue")
499515
delete(publishing.Headers, "x-first-death-reason")
500-
delete(publishing.Headers, "x-first-death-exchange")
516+
517+
b.Log().
518+
WithField("message_id", publishing.MessageId).
519+
WithField("target_queue", targetQueue).
520+
WithField("first_death_routing_key", routingKey).
521+
WithField("first_death_exchange", exchange).
522+
Info("returning dead message to queue...")
501523

502524
send := func(tx *sql.Tx) error {
503-
return b.publish(tx, exchange, routingKey, publishing)
525+
// Publishing a "resurrected" message is done directly to the target queue using the default exchange
526+
return b.publish(tx, "", targetQueue, publishing)
504527
}
505528
return b.withTx(send, ambientTx)
506529
}
507530

531+
// Extracts the routing key of the first death of the message. "x-death" header contains a list of "deaths" that happened to this message, with
532+
// the most recent death always being first in the list, so fhe first death is the last one. More information: https://www.rabbitmq.com/dlx.html
533+
func extractFirstDeathRoutingKey(headers amqp.Table) (result string, err error) {
534+
xDeathList, ok := headers["x-death"].([]interface{})
535+
if !ok {
536+
return "", fmt.Errorf("failed extracting routing-key from headers, bad 'x-death' field - %v", headers["x-death"])
537+
}
538+
539+
xDeath, ok := xDeathList[0].(amqp.Table)
540+
if !ok {
541+
return "", fmt.Errorf("failed extracting routing-key from headers, bad 'x-death' field - %v", headers["x-death"])
542+
}
543+
544+
routingKeys, ok := xDeath["routing-keys"].([]interface{})
545+
if !ok {
546+
return "", fmt.Errorf("failed extracting routing-key from headers, bad 'routing-keys' field - %v", xDeath["routing-keys"])
547+
}
548+
549+
routingKey, ok := routingKeys[len(routingKeys)-1].(string)
550+
if !ok {
551+
return "", fmt.Errorf("failed extracting routing-key from headers, bad 'routing-keys' field - %v", xDeath["routing-keys"])
552+
}
553+
554+
return routingKey, nil
555+
}
556+
508557
//Publish implements GBus.Publish(topic, message)
509558
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error {
510559
return b.publishWithTx(ctx, nil, exchange, topic, message, policies...)
@@ -717,13 +766,3 @@ type rpcPolicy struct {
717766
func (p rpcPolicy) Apply(publishing *amqp.Publishing) {
718767
publishing.Headers[RPCHeaderName] = p.rpcID
719768
}
720-
721-
//Log returns the default logrus.FieldLogger for the bus via the Glogged helper
722-
func (b *DefaultBus) Log() logrus.FieldLogger {
723-
if b.Glogged == nil {
724-
b.Glogged = &Glogged{
725-
log: logrus.WithField("_service", b.SvcName),
726-
}
727-
}
728-
return b.Glogged.Log()
729-
}

gbus/configuration.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package gbus
2+
3+
import "time"
4+
5+
//BusConfiguration provides configuration passed to the bus builder
6+
type BusConfiguration struct {
7+
MaxRetryCount uint
8+
BaseRetryDuration int //TODO:Change type to uint
9+
OutboxCfg OutboxConfiguration
10+
}
11+
12+
//OutboxConfiguration configures the transactional outbox
13+
type OutboxConfiguration struct {
14+
/*
15+
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
16+
Increase this value if you are experiencing deadlocks.
17+
Default is 10
18+
*/
19+
Ackers uint
20+
//PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500
21+
PageSize uint
22+
//MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds
23+
MetricsInterval time.Duration
24+
//SendInterval is the duration the outbox waits before each iteration, default is 1 second
25+
SendInterval time.Duration
26+
/*
27+
ScavengeInterval is the duration the outbox waits before attempting to re-send messages that
28+
were already sent to the broker but were not yet confirmed.
29+
Default is 60 seconds
30+
*/
31+
ScavengeInterval time.Duration
32+
}

0 commit comments

Comments
 (0)