Skip to content

Commit 784d8f4

Browse files
author
Guy Baron
authored
added metrics for transactional outbox (#193)
* added metrics for transactional outbox The follwoing metrics were added outbox_total_records: reports the total amount of records currently in the outbox outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox * reading status and count fields in the correct order from rows
1 parent b109f06 commit 784d8f4

File tree

4 files changed

+106
-23
lines changed

4 files changed

+106
-23
lines changed

docs/METRICS.md

+4
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,7 @@ grabbit exposes and reports the following metrics to Prometheus
1010
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
1111
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
1212
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
13+
| grabbit | outbox | outbox_total_records | reports the total amount of records currently in the outbox |
14+
| grabbit | outbox | outbox_pending_delivery | reports the total amount of records pending delivery currently in the outbox |
15+
| grabbit | outbox | outbox_pending_removal | reports the total amount of records that were sent and pending removal currently in the outbox |
16+

gbus/configuration.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,16 @@ type BusConfiguration struct {
1111

1212
//OutboxConfiguration configures the transactional outbox
1313
type OutboxConfiguration struct {
14+
/*
15+
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
16+
Increase this value if you are experiencing deadlocks.
17+
Default is 10
18+
*/
19+
Ackers uint
1420
//PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500
1521
PageSize uint
22+
//MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds
23+
MetricsInterval time.Duration
1624
//SendInterval is the duration the outbox waits before each iteration, default is 1 second
1725
SendInterval time.Duration
1826
/*
@@ -21,11 +29,4 @@ type OutboxConfiguration struct {
2129
Default is 60 seconds
2230
*/
2331
ScavengeInterval time.Duration
24-
/*
25-
Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
26-
Increase this value if you are experiencing deadlocks.
27-
Default is 10
28-
*/
29-
30-
Ackers uint
3132
}

gbus/metrics/outbox_metrics.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
var OutboxSize = promauto.NewGauge(prometheus.GaugeOpts{
9+
Namespace: grabbitPrefix,
10+
Name: "outbox_total_records",
11+
Subsystem: "outbox",
12+
Help: "reports the total amount of records currently in the outbox",
13+
})
14+
15+
var PendingMessages = promauto.NewGauge(prometheus.GaugeOpts{
16+
Namespace: grabbitPrefix,
17+
Name: "outbox_pending_delivery",
18+
Subsystem: "outbox",
19+
Help: "reports the total amount of records pending delivery currently in the outbox",
20+
})
21+
22+
var SentMessages = promauto.NewGauge(prometheus.GaugeOpts{
23+
Namespace: grabbitPrefix,
24+
Name: "outbox_pending_removal",
25+
Subsystem: "outbox",
26+
Help: "reports the total amount of records that were sent and pending removal currently in the outbox",
27+
})

gbus/tx/mysql/txoutbox.go

+67-16
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,22 @@ import (
1414
log "github.com/sirupsen/logrus"
1515
"github.com/streadway/amqp"
1616
"github.com/wework/grabbit/gbus"
17+
"github.com/wework/grabbit/gbus/metrics"
1718
"github.com/wework/grabbit/gbus/tx"
1819
)
1920

20-
var (
21-
pending int
22-
//waitingConfirm = 1
21+
const (
22+
Pending int = iota + 1
23+
Sent
24+
)
2325

24-
//TODO:get these values from configuration
26+
var (
2527
maxPageSize = 500
2628
maxDeliveryAttempts = 50
2729
sendInterval = time.Second
28-
29-
scavengeInterval = time.Second * 60
30-
ackers = 10
30+
scavengeInterval = time.Second * 60
31+
metricsInterval = time.Second * 15
32+
ackers = 10
3133
)
3234

3335
//TxOutbox is a mysql based transactional outbox
@@ -114,7 +116,7 @@ func (outbox *TxOutbox) Save(tx *sql.Tx, exchange, routingKey string, amqpMessag
114116
return err
115117
}
116118
unknownDeliverTag := -1
117-
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), pending)
119+
_, insertErr := tx.Exec(insertSQL, amqpMessage.MessageId, amqpMessage.Headers["x-msg-name"], unknownDeliverTag, exchange, routingKey, buf.Bytes(), Pending)
118120

119121
return insertErr
120122
}
@@ -149,6 +151,9 @@ func NewOutbox(svcName string, txProv gbus.TxProvider, purgeOnStartup bool, cfg
149151
if cfg.ScavengeInterval.String() != "0s" {
150152
scavengeInterval = cfg.ScavengeInterval
151153
}
154+
if cfg.MetricsInterval.String() != "0s" {
155+
metricsInterval = cfg.MetricsInterval
156+
}
152157
if cfg.Ackers > 0 {
153158
ackers = int(cfg.Ackers)
154159
}
@@ -176,29 +181,75 @@ func (outbox *TxOutbox) ackRec() {
176181

177182
func (outbox *TxOutbox) processOutbox() {
178183

179-
send := time.NewTicker(sendInterval).C
180-
// cleanUp := time.NewTicker(cleanupInterval).C
181-
scavenge := time.NewTicker(scavengeInterval).C
184+
send := time.NewTicker(sendInterval)
185+
scavenge := time.NewTicker(scavengeInterval)
186+
metrics := time.NewTicker(metricsInterval)
182187

183188
for {
184189
select {
185190
case <-outbox.exit:
191+
send.Stop()
192+
scavenge.Stop()
193+
metrics.Stop()
186194
return
187195
//TODO:get time duration from configuration
188-
case <-send:
196+
case <-send.C:
189197

190198
err := outbox.sendMessages(outbox.getMessageRecords)
191199
if err != nil {
192200
outbox.log().WithError(err).Error("failed to send messages from outbox")
193201
}
194202

195-
case <-scavenge:
203+
case <-scavenge.C:
196204
err := outbox.sendMessages(outbox.scavengeOrphanedRecords)
197205
if err != nil {
198206
outbox.log().WithError(err).Error("failed to scavenge records")
199207
}
208+
case <-metrics.C:
209+
if err := outbox.reportMetrics(); err != nil {
210+
outbox.log().WithError(err).Error("failed to report outbox meetrics")
211+
}
200212
}
213+
214+
}
215+
}
216+
217+
func (outbox *TxOutbox) reportMetrics() error {
218+
219+
tx, txErr := outbox.txProv.New()
220+
if txErr != nil {
221+
return txErr
201222
}
223+
224+
rows, qErr := tx.Query(`SELECT status, count(*) FROM ` + getOutboxName(outbox.svcName) + ` GROUP BY status`)
225+
if qErr != nil {
226+
_ = tx.Rollback()
227+
return qErr
228+
}
229+
230+
var totalOutboxSize int
231+
for rows.Next() {
232+
var count, status int
233+
rows.Scan(&status, &count)
234+
totalOutboxSize += count
235+
switch status {
236+
case Pending:
237+
metrics.PendingMessages.Set(float64(count))
238+
case Sent:
239+
metrics.SentMessages.Set(float64(count))
240+
}
241+
}
242+
metrics.OutboxSize.Set(float64(totalOutboxSize))
243+
244+
if closeErr := rows.Close(); closeErr != nil {
245+
outbox.log().WithError(closeErr).Warn("failed closing rows after iteration for metric data")
246+
}
247+
248+
if commitErr := tx.Commit(); commitErr != nil {
249+
outbox.log().WithError(commitErr).Warn("failed committing transaction after iteration for metric data")
250+
return commitErr
251+
}
252+
return nil
202253
}
203254

204255
func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
@@ -237,12 +288,12 @@ func (outbox *TxOutbox) updateAckedRecord(deliveryTag uint64) error {
237288
}
238289

239290
func (outbox *TxOutbox) getMessageRecords(tx *sql.Tx) (*sql.Rows, error) {
240-
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = 0 AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
291+
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " USE INDEX (status_delivery) WHERE status = " + strconv.Itoa(Pending) + " AND delivery_attempts < " + strconv.Itoa(maxDeliveryAttempts) + " ORDER BY rec_id ASC LIMIT " + strconv.Itoa(maxPageSize) + " FOR UPDATE SKIP LOCKED"
241292
return tx.Query(selectSQL)
242293
}
243294

244295
func (outbox *TxOutbox) scavengeOrphanedRecords(tx *sql.Tx) (*sql.Rows, error) {
245-
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = 1 ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
296+
selectSQL := "SELECT rec_id, exchange, routing_key, publishing FROM " + getOutboxName(outbox.svcName) + " WHERE status = " + strconv.Itoa(Sent) + " ORDER BY rec_id ASC LIMIT ? FOR UPDATE SKIP LOCKED"
246297
return tx.Query(selectSQL, strconv.Itoa(maxPageSize))
247298
}
248299

@@ -310,7 +361,7 @@ func (outbox *TxOutbox) sendMessages(recordSelector func(tx *sql.Tx) (*sql.Rows,
310361
outbox.log().WithField("messages_sent", len(successfulDeliveries)).Info("outbox relayed messages")
311362
}
312363
for deliveryTag, id := range successfulDeliveries {
313-
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status=1, delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
364+
_, updateErr := tx.Exec("UPDATE "+getOutboxName(outbox.svcName)+" SET status="+strconv.Itoa(Sent)+", delivery_tag=?, relay_id=? WHERE rec_id=?", deliveryTag, outbox.ID, id)
314365
if updateErr != nil {
315366
outbox.log().WithError(updateErr).
316367
WithFields(log.Fields{"record_id": id, "delivery_tag": deliveryTag, "relay_id": outbox.ID}).

0 commit comments

Comments
 (0)