Skip to content

Commit 249ae3a

Browse files
committed
add code refactor and naming changes
1 parent ad4ee66 commit 249ae3a

File tree

8 files changed

+128
-144
lines changed

8 files changed

+128
-144
lines changed

batch.go

+38-49
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@ type (
2222
poolCapacity int
2323
}
2424
batch struct {
25-
count int
26-
size int
27-
r *PublishResult
28-
msgs []*utp.PublishMessage
29-
30-
timeRefs []timeID
25+
count int
26+
size int
27+
r *PublishResult
28+
pubMessages []*utp.PublishMessage
3129
}
3230
// batchGroup map[timeID]*batch
3331
batchManager struct {
@@ -44,8 +42,8 @@ type (
4442

4543
func (m *batchManager) newBatch(timeID timeID) *batch {
4644
b := &batch{
47-
r: &PublishResult{result: result{complete: make(chan struct{})}},
48-
msgs: make([]*utp.PublishMessage, 0),
45+
r: &PublishResult{result: result{complete: make(chan struct{})}},
46+
pubMessages: make([]*utp.PublishMessage, 0),
4947
}
5048
m.batchGroup[timeID] = b
5149

@@ -98,17 +96,17 @@ func (m *batchManager) close() {
9896
}
9997

10098
// add adds a publish message to a batch in the batch group.
101-
func (m *batchManager) add(delay int32, p *utp.PublishMessage) *PublishResult {
99+
func (m *batchManager) add(delay int32, pubMsg *utp.PublishMessage) *PublishResult {
102100
m.mu.Lock()
103101
defer m.mu.Unlock()
104102
timeID := m.TimeID(delay)
105103
b, ok := m.batchGroup[timeID]
106104
if !ok {
107105
b = m.newBatch(timeID)
108106
}
109-
b.msgs = append(b.msgs, p)
107+
b.pubMessages = append(b.pubMessages, pubMsg)
110108
b.count++
111-
b.size += len(p.Payload)
109+
b.size += len(pubMsg.Payload)
112110
if b.count > m.opts.batchCountThreshold || b.size > m.opts.batchByteThreshold {
113111
m.push(b)
114112
delete(m.batchGroup, timeID)
@@ -118,7 +116,7 @@ func (m *batchManager) add(delay int32, p *utp.PublishMessage) *PublishResult {
118116

119117
// push enqueues a batch to publish.
120118
func (m *batchManager) push(b *batch) {
121-
if len(b.msgs) != 0 {
119+
if len(b.pubMessages) != 0 {
122120
m.publishQueue <- b
123121
}
124122
}
@@ -165,22 +163,18 @@ func (m *batchManager) publishLoop(interval time.Duration) {
165163
// dispatch handles publishing messages for the batches in queue.
166164
func (m *batchManager) dispatch(timeout time.Duration) {
167165
LOOP:
168-
for {
169-
select {
170-
case b, ok := <-m.publishQueue:
171-
if !ok {
172-
close(m.send)
173-
m.stopWg.Done()
174-
return
175-
}
166+
b, ok := <-m.publishQueue
167+
if !ok {
168+
close(m.send)
169+
m.stopWg.Done()
170+
return
171+
}
176172

177-
select {
178-
case m.send <- b:
179-
default:
180-
// pool is full, let GC handle the batches
181-
goto WAIT
182-
}
183-
}
173+
select {
174+
case m.send <- b:
175+
default:
176+
// pool is full, let GC handle the batches
177+
goto WAIT
184178
}
185179

186180
WAIT:
@@ -196,31 +190,26 @@ func (m *batchManager) publish(c *client, publishWaitTimeout time.Duration) {
196190
case <-m.stop:
197191
// run queued messges from the publish queue and
198192
// process it until queue is empty.
199-
for {
200-
select {
201-
case b, ok := <-m.send:
202-
if !ok {
203-
m.stopWg.Done()
204-
return
205-
}
206-
pub := &utp.Publish{DeliveryMode: 2, Messages: b.msgs}
207-
mID := c.nextID(b.r)
208-
pub.MessageID = c.outboundID(mID)
209-
210-
// persist outbound
211-
c.storeOutbound(pub)
212-
213-
select {
214-
case c.send <- &MessageAndResult{m: pub, r: b.r}:
215-
case <-time.After(publishWaitTimeout):
216-
b.r.setError(errors.New("publish timeout error occurred"))
217-
}
218-
default:
219-
}
193+
b, ok := <-m.send
194+
if !ok {
195+
m.stopWg.Done()
196+
return
197+
}
198+
pub := &utp.Publish{DeliveryMode: 2, Messages: b.pubMessages}
199+
mID := c.nextID(b.r)
200+
pub.MessageID = c.outboundID(mID)
201+
202+
// persist outbound
203+
c.storeOutbound(pub)
204+
205+
select {
206+
case c.send <- &MessageAndResult{m: pub, r: b.r}:
207+
case <-time.After(publishWaitTimeout):
208+
b.r.setError(errors.New("publish timeout error occurred"))
220209
}
221210
case b := <-m.send:
222211
if b != nil {
223-
pub := &utp.Publish{DeliveryMode: 2, Messages: b.msgs}
212+
pub := &utp.Publish{DeliveryMode: 2, Messages: b.pubMessages}
224213
mID := c.nextID(b.r)
225214
pub.MessageID = c.outboundID(mID)
226215

client.go

+28-43
Original file line numberDiff line numberDiff line change
@@ -370,21 +370,21 @@ func (c *client) Relay(topic string, relOpts ...RelOptions) Result {
370370
opt.set(opts)
371371
}
372372

373-
rel := &utp.Relay{}
374-
rel.RelayRequests = append(rel.RelayRequests, &utp.RelayRequest{Topic: topic, Last: opts.last})
373+
relMsg := &utp.Relay{}
374+
relMsg.RelayRequests = append(relMsg.RelayRequests, &utp.RelayRequest{Topic: topic, Last: opts.last})
375375

376-
if rel.MessageID == 0 {
376+
if relMsg.MessageID == 0 {
377377
mID := c.nextID(r)
378-
rel.MessageID = c.outboundID(mID)
378+
relMsg.MessageID = c.outboundID(mID)
379379
}
380380
relayWaitTimeout := c.opts.writeTimeout
381381
if relayWaitTimeout == 0 {
382382
relayWaitTimeout = time.Second * 30
383383
}
384384
// persist outbound
385-
c.storeOutbound(rel)
385+
c.storeOutbound(relMsg)
386386
select {
387-
case c.send <- &MessageAndResult{m: rel, r: r}:
387+
case c.send <- &MessageAndResult{m: relMsg, r: r}:
388388
case <-time.After(relayWaitTimeout):
389389
r.setError(errors.New("relay request timeout error occurred"))
390390
return r
@@ -406,21 +406,21 @@ func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
406406
opt.set(opts)
407407
}
408408

409-
sub := &utp.Subscribe{}
410-
sub.Subscriptions = append(sub.Subscriptions, &utp.Subscription{DeliveryMode: opts.deliveryMode, Delay: opts.delay, Topic: topic})
409+
subMsg := &utp.Subscribe{}
410+
subMsg.Subscriptions = append(subMsg.Subscriptions, &utp.Subscription{DeliveryMode: opts.deliveryMode, Delay: opts.delay, Topic: topic})
411411

412-
if sub.MessageID == 0 {
412+
if subMsg.MessageID == 0 {
413413
mID := c.nextID(r)
414-
sub.MessageID = c.outboundID(mID)
414+
subMsg.MessageID = c.outboundID(mID)
415415
}
416416
subscribeWaitTimeout := c.opts.writeTimeout
417417
if subscribeWaitTimeout == 0 {
418418
subscribeWaitTimeout = time.Second * 30
419419
}
420420
// persist outbound
421-
c.storeOutbound(sub)
421+
c.storeOutbound(subMsg)
422422
select {
423-
case c.send <- &MessageAndResult{m: sub, r: r}:
423+
case c.send <- &MessageAndResult{m: subMsg, r: r}:
424424
case <-time.After(subscribeWaitTimeout):
425425
r.setError(errors.New("subscribe timeout error occurred"))
426426
return r
@@ -434,25 +434,25 @@ func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
434434
// received.
435435
func (c *client) Unsubscribe(topics ...string) Result {
436436
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
437-
unsub := &utp.Unsubscribe{}
437+
unsubMsg := &utp.Unsubscribe{}
438438
var subs []*utp.Subscription
439439
for _, topic := range topics {
440440
sub := &utp.Subscription{Topic: topic}
441441
subs = append(subs, sub)
442442
}
443-
unsub.Subscriptions = subs
444-
if unsub.MessageID == 0 {
443+
unsubMsg.Subscriptions = subs
444+
if unsubMsg.MessageID == 0 {
445445
mID := c.nextID(r)
446-
unsub.MessageID = c.outboundID(mID)
446+
unsubMsg.MessageID = c.outboundID(mID)
447447
}
448448
unsubscribeWaitTimeout := c.opts.writeTimeout
449449
if unsubscribeWaitTimeout == 0 {
450450
unsubscribeWaitTimeout = time.Second * 30
451451
}
452452
// persist outbound
453-
c.storeOutbound(unsub)
453+
c.storeOutbound(unsubMsg)
454454
select {
455-
case c.send <- &MessageAndResult{m: unsub, r: r}:
455+
case c.send <- &MessageAndResult{m: unsubMsg, r: r}:
456456
case <-time.After(unsubscribeWaitTimeout):
457457
r.setError(errors.New("unsubscribe timeout error occurred"))
458458
return r
@@ -472,34 +472,19 @@ func (c *client) resume(prefix uint32, subscription bool) {
472472
if (k & (1 << 4)) == 0 {
473473
switch msg.Type() {
474474
case utp.RELAY:
475-
p := msg.(*utp.Relay)
475+
relMsg := msg.(*utp.Relay)
476476
r := &RelayResult{result: result{complete: make(chan struct{})}}
477477
r.messageID = msg.Info().MessageID
478478
c.messageIds.resumeID(MID(r.messageID))
479-
// var topics []RelayRequest
480-
// for _, req := range p.RelayRequests {
481-
// var t RelayRequest
482-
// t.Topic = req.Topic
483-
// t.Last = req.Last
484-
// topics = append(topics, t)
485-
// }
486-
r.reqs = p.RelayRequests
479+
r.reqs = relMsg.RelayRequests
487480
c.send <- &MessageAndResult{m: msg, r: r}
488481
case utp.SUBSCRIBE:
489482
if subscription {
490-
p := msg.(*utp.Subscribe)
483+
subMsg := msg.(*utp.Subscribe)
491484
r := &SubscribeResult{result: result{complete: make(chan struct{})}}
492485
r.messageID = msg.Info().MessageID
493486
c.messageIds.resumeID(MID(r.messageID))
494-
// var topics []Subscription
495-
// for _, sub := range p.Subscriptions {
496-
// var t Subscription
497-
// t.Topic = sub.Topic
498-
// t.DeliveryMode = sub.DeliveryMode
499-
// topics = append(topics, t)
500-
// }
501-
// r.subs = append(r.subs, topics...)
502-
r.subs = p.Subscriptions
487+
r.subs = subMsg.Subscriptions
503488
c.send <- &MessageAndResult{m: msg, r: r}
504489
}
505490
case utp.UNSUBSCRIBE:
@@ -515,20 +500,20 @@ func (c *client) resume(prefix uint32, subscription bool) {
515500
c.messageIds.resumeID(MID(r.messageID))
516501
c.send <- &MessageAndResult{m: msg, r: r}
517502
case utp.FLOWCONTROL:
518-
ctrl := msg.(*utp.ControlMessage)
519-
switch ctrl.FlowControl {
503+
ctrlMsg := msg.(*utp.ControlMessage)
504+
switch ctrlMsg.FlowControl {
520505
case utp.RECEIPT:
521-
c.send <- &MessageAndResult{m: ctrl}
506+
c.send <- &MessageAndResult{m: ctrlMsg}
522507
}
523508
default:
524509
store.Log.Delete(k)
525510
}
526511
} else {
527512
switch msg.Type() {
528513
case utp.FLOWCONTROL:
529-
ctrl := msg.(*utp.ControlMessage)
530-
c.messageIds.resumeID(MID(ctrl.MessageID))
531-
switch ctrl.FlowControl {
514+
ctrlMsg := msg.(*utp.ControlMessage)
515+
c.messageIds.resumeID(MID(ctrlMsg.MessageID))
516+
switch ctrlMsg.FlowControl {
532517
case utp.NOTIFY:
533518
c.recv <- msg
534519
}

cmd/sample/main.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ func main() {
7373
}
7474
close(recv)
7575
}),
76-
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, msg unitdb.Message) {
77-
recv <- [2][]byte{[]byte(msg.Topic()), msg.Payload()}
76+
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, pubMsg unitdb.PubMessage) {
77+
for _, msg := range pubMsg.Messages() {
78+
recv <- [2][]byte{[]byte(msg.Topic), msg.Payload}
79+
}
7880
}),
7981
)
8082
if err != nil {
@@ -133,8 +135,10 @@ func main() {
133135
}
134136
close(recv)
135137
}),
136-
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, msg unitdb.Message) {
137-
recv <- [2][]byte{[]byte(msg.Topic()), msg.Payload()}
138+
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, pubMsg unitdb.PubMessage) {
139+
for _, msg := range pubMsg.Messages() {
140+
recv <- [2][]byte{[]byte(msg.Topic), msg.Payload}
141+
}
138142
}),
139143
unitdb.WithBatchDuration(10*time.Second),
140144
)
@@ -217,8 +221,10 @@ func main() {
217221
}
218222
close(recv)
219223
}),
220-
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, msg unitdb.Message) {
221-
recv <- [2][]byte{[]byte(msg.Topic()), msg.Payload()}
224+
unitdb.WithDefaultMessageHandler(func(client unitdb.Client, pubMsg unitdb.PubMessage) {
225+
for _, msg := range pubMsg.Messages() {
226+
recv <- [2][]byte{[]byte(msg.Topic), msg.Payload}
227+
}
222228
}),
223229
unitdb.WithBatchDuration(10*time.Second),
224230
)

cmd/simple/main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
unitdb "github.com/unit-io/unitdb-go"
1111
)
1212

13-
var f unitdb.MessageHandler = func(client unitdb.Client, msg unitdb.Message) {
14-
fmt.Printf("TOPIC: %s\n", msg.Topic())
15-
fmt.Printf("MSG: %s\n", msg.Payload())
13+
var f unitdb.MessageHandler = func(client unitdb.Client, pubMsg unitdb.PubMessage) {
14+
for _, msg := range pubMsg.Messages() {
15+
fmt.Printf("TOPIC: %s\n", msg.Topic)
16+
fmt.Printf("MSG: %s\n", msg.Payload)
17+
}
1618
}
1719

1820
func main() {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ require (
88
google.golang.org/grpc v1.39.0
99
)
1010

11-
replace github.com/unit-io/unitdb => /src/github.com/unit-io/unitdb
11+
// replace github.com/unit-io/unitdb => /src/github.com/unit-io/unitdb

0 commit comments

Comments
 (0)