Skip to content

Commit ea8a64f

Browse files
author
zouyi
committed
combine good parts from other forks
1 parent 35d75ae commit ea8a64f

15 files changed

+153
-31
lines changed

README.md

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
# delay-queue
2-
[![Go Report Card](https://goreportcard.com/badge/github.com/ouqiang/delay-queue)](https://goreportcard.com/report/github.com/ouqiang/delay-queue)
3-
[![Downloads](https://img.shields.io/github/downloads/ouqiang/delay-queue/total.svg)](https://github.com/ouqiang/delay-queue/releases)
4-
[![license](https://img.shields.io/github/license/mashape/apistatus.svg?maxAge=2592000)](https://github.com/ouqiang/delay-queue/blob/master/LICENSE)
5-
[![Release](https://img.shields.io/github/release/ouqiang/delay-queue.svg?label=Release)](https://github.com/ouqiang/delay-queue/releases)
2+
3+
基于[ouqiang/delay-queue](https://github.com/ouqiang/delay-queue) [suhuaguo/delay-queue](https://github.com/suhuaguo/delay-queue) [moshuipan/delay-queue](https://github.com/moshuipan/delay-queue/) 代码并加入编译优化脚本
4+
65

76
基于Redis实现的延迟队列, 参考[有赞延迟队列设计](http://tech.youzan.com/queuing_delay)实现
87

build.sh

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
mkdir -p dist
2+
if [[ `uname -s` == "Darwin" ]]; then
3+
echo "system is macos"
4+
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-linux-64
5+
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-win-64
6+
go build -ldflags '-w' -o dist/delay-queue-macos-64
7+
else
8+
echo "system is macos"
9+
CGO_ENABLED=0 GOOS=macos GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-linux-64
10+
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags '-w' -o dist/delay-queue-win-64
11+
go build -ldflags '-w' -o dist/delay-queue-macos-64
12+
fi
13+
upx -9 dist/delay-queue-linux-64
14+
upx -9 dist/delay-queue-win-64
15+
upx -9 dist/delay-queue-macos-64

cmd/cmd.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"net/http"
88
"os"
99

10-
"github.com/ouqiang/delay-queue/config"
11-
"github.com/ouqiang/delay-queue/delayqueue"
12-
"github.com/ouqiang/delay-queue/routers"
10+
"github.com/php-cpm/delay-queue/config"
11+
"github.com/php-cpm/delay-queue/delayqueue"
12+
"github.com/php-cpm/delay-queue/routers"
1313
)
1414

1515
// Cmd 应用入口Command
@@ -54,6 +54,7 @@ func (cmd *Cmd) parseCommandArgs() {
5454

5555
// 运行Web Server
5656
func (cmd *Cmd) runWeb() {
57+
// 有详细的说明:https://studygolang.com/resources/4657 讲的非常详细
5758
http.HandleFunc("/push", routers.Push)
5859
http.HandleFunc("/pop", routers.Pop)
5960
http.HandleFunc("/finish", routers.Delete)

config/config.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ const (
4040
DefaultRedisReadTimeout = 180000
4141
// DefaultRedisWriteTimeout Redis写入超时时间, 单位毫秒
4242
DefaultRedisWriteTimeout = 3000
43+
// DefaultKeyName key名称
44+
DefaultKeyName = "dq_key_"
4345
)
4446

4547
// Config 应用配置
4648
type Config struct {
4749
BindAddress string // http server 监听地址
48-
BucketSize int // bucket数量
50+
BucketSize int // bucket数量,timer 的数量
4951
BucketName string // bucket在redis中的键名,
5052
QueueName string // ready queue在redis中的键名
5153
QueueBlockTimeout int // 调用blpop阻塞超时时间, 单位秒, 修改此项, redis.read_timeout必须做相应调整
@@ -66,29 +68,36 @@ type RedisConfig struct {
6668

6769
// Init 初始化配置
6870
func Init(path string) {
71+
// &Config{} :结构体初始化
72+
// http://blog.csdn.net/xxx9001/article/details/52574501 结构体初始化
6973
Setting = &Config{}
7074
if path == "" {
75+
// 加载默认的配置文件
7176
Setting.initDefaultConfig()
7277
return
7378
}
7479

80+
// 转换配置文件
7581
Setting.parse(path)
7682
}
7783

7884
// 解析配置文件
7985
func (config *Config) parse(path string) {
86+
// 加载配置文件
8087
file, err := ini.Load(path)
8188
if err != nil {
8289
log.Fatalf("无法解析配置文件#%s", err.Error())
8390
}
8491

92+
// bucket 参数
8593
section := file.Section("")
8694
config.BindAddress = section.Key("bind_address").MustString(DefaultBindAddress)
8795
config.BucketSize = section.Key("bucket_size").MustInt(DefaultBucketSize)
8896
config.BucketName = section.Key("bucket_name").MustString(DefaultBucketName)
8997
config.QueueName = section.Key("queue_name").MustString(DefaultQueueName)
9098
config.QueueBlockTimeout = section.Key("queue_block_timeout").MustInt(DefaultQueueBlockTimeout)
9199

100+
// redis 相关的参数
92101
config.Redis.Host = section.Key("redis.host").MustString(DefaultRedisHost)
93102
config.Redis.Db = section.Key("redis.db").MustInt(DefaultRedisDb)
94103
config.Redis.Password = section.Key("redis.password").MustString(DefaultRedisPassword)
@@ -104,7 +113,7 @@ func (config *Config) initDefaultConfig() {
104113
config.BindAddress = DefaultBindAddress
105114
config.BucketSize = DefaultBucketSize
106115
config.BucketName = DefaultBucketName
107-
config.QueueName = DefaultQueueName
116+
config.QueueName = DefaultQueueName // ready queue
108117
config.QueueBlockTimeout = DefaultQueueBlockTimeout
109118

110119
config.Redis.Host = DefaultRedisHost

delayqueue/bucket.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66

77
// BucketItem bucket中的元素
88
type BucketItem struct {
9-
timestamp int64
10-
jobId string
9+
timestamp int64 // 时间戳
10+
jobId string // jobId
1111
}
1212

1313
// 添加JobId到bucket中
@@ -19,6 +19,7 @@ func pushToBucket(key string, timestamp int64, jobId string) error {
1919

2020
// 从bucket中获取延迟时间最小的JobId
2121
func getFromBucket(key string) (*BucketItem, error) {
22+
// 返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序。
2223
value, err := execRedisCommand("ZRANGE", key, 0, 0, "WITHSCORES")
2324
if err != nil {
2425
return nil, err

delayqueue/delay_queue.go

+42-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"log"
77
"time"
88

9-
"github.com/ouqiang/delay-queue/config"
9+
"github.com/php-cpm/delay-queue/config"
1010
)
1111

1212
var (
@@ -18,8 +18,14 @@ var (
1818

1919
// Init 初始化延时队列
2020
func Init() {
21+
// 初始化 redis 连接池
2122
RedisPool = initRedisPool()
23+
24+
// 初始化一些列 Timer
2225
initTimers()
26+
27+
// golang 函数:http://blog.csdn.net/mungo/article/details/52481285
28+
// 进行初始化,产生一个 go routine
2329
bucketNameChan = generateBucketName()
2430
}
2531

@@ -34,6 +40,8 @@ func Push(job Job) error {
3440
log.Printf("添加job到job pool失败#job-%+v#%s", job, err.Error())
3541
return err
3642
}
43+
44+
// 轮询的方式存放。ZADD 命令 。存放在有序集合中。将会从这里获取 要运行的 job
3745
err = pushToBucket(<-bucketNameChan, job.Delay, job.Id)
3846
if err != nil {
3947
log.Printf("添加job到bucket失败#job-%+v#%s", job, err.Error())
@@ -45,7 +53,9 @@ func Push(job Job) error {
4553

4654
// Pop 轮询获取Job
4755
func Pop(topics []string) (*Job, error) {
48-
jobId, err := blockPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout)
56+
// ready queue 里面只有 topic 作为 key
57+
// jobId, err := blockPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout)
58+
jobId, err := blPopFromReadyQueue(topics, config.Setting.QueueBlockTimeout)
4959
if err != nil {
5060
return nil, err
5161
}
@@ -66,14 +76,20 @@ func Pop(topics []string) (*Job, error) {
6676
return nil, nil
6777
}
6878

79+
// 重新放到 Bucket 中,等待重新消费。实现至少一次的逻辑。如果客户端删除了 job ,那么。调度到此 jobId 的时候,发现 job 不存在,直接在 bucket 中删除
6980
timestamp := time.Now().Unix() + job.TTR
70-
err = pushToBucket(<-bucketNameChan, timestamp, job.Id)
81+
// 表示从 <-bucketNameChan 。这个 channel 接收一个值
82+
err = pushToBucket(fmt.Sprintf(config.Setting.BucketName, 1), timestamp, job.Id) //待确认的消息放入bucket1
7183

7284
return job, err
7385
}
7486

7587
// Remove 删除Job
7688
func Remove(jobId string) error {
89+
err := removeFromBucket(fmt.Sprintf(config.Setting.BucketName, 1), jobId)
90+
if err != nil {
91+
return err
92+
}
7793
return removeJob(jobId)
7894
}
7995

@@ -93,10 +109,22 @@ func Get(jobId string) (*Job, error) {
93109

94110
// 轮询获取bucket名称, 使job分布到不同bucket中, 提高扫描速度
95111
func generateBucketName() <-chan string {
112+
// 阻塞 channel
96113
c := make(chan string)
114+
// 1、为什么这么写呢?为什么不直接写个 for 死循环呢?
115+
// 如果直接写 for 循环,那在初始化的时候,会阻塞其他 init 函数。如果另起一个 go routine 的话,就不会阻塞其他的
116+
117+
// 2、每次都 产生一个 go routine 。是怎么销毁的呀?
118+
// 因为把这个函数 赋给某个 变量了。在 init 中初始化了。只有一个 go routine 。
119+
120+
// 3、我感觉到 里面的 i 变量好像没有作用的呀,因为都没有和其他 go routine 交换。
121+
// 因为在 初始化 init 一下。每次都从 bucketNameChan 这个 channel 读取信息。
97122
go func() {
98123
i := 1
124+
125+
// 死循环
99126
for {
127+
// chan <- 发送消息
100128
c <- fmt.Sprintf(config.Setting.BucketName, i)
101129
if i >= config.Setting.BucketSize {
102130
i = 1
@@ -109,21 +137,27 @@ func generateBucketName() <-chan string {
109137
return c
110138
}
111139

112-
// 初始化定时器
140+
// 初始化定时器 https://yq.aliyun.com/articles/69303
113141
func initTimers() {
114142
timers = make([]*time.Ticker, config.Setting.BucketSize)
115143
var bucketName string
116144
for i := 0; i < config.Setting.BucketSize; i++ {
145+
146+
// 每 1s 执行一次
117147
timers[i] = time.NewTicker(1 * time.Second)
148+
149+
// 如果这里部署多实例的话,就会产生竞争
118150
bucketName = fmt.Sprintf(config.Setting.BucketName, i+1)
151+
152+
// 并发执行
119153
go waitTicker(timers[i], bucketName)
120154
}
121155
}
122156

123157
func waitTicker(timer *time.Ticker, bucketName string) {
124158
for {
125159
select {
126-
case t := <-timer.C:
160+
case t := <-timer.C: // 我们启动一个新的goroutine,来以阻塞的方式从Timer的C这个channel中,等待接收一个值,这个值是到期的时间。
127161
tickHandler(t, bucketName)
128162
}
129163
}
@@ -132,6 +166,7 @@ func waitTicker(timer *time.Ticker, bucketName string) {
132166
// 扫描bucket, 取出延迟时间小于当前时间的Job
133167
func tickHandler(t time.Time, bucketName string) {
134168
for {
169+
// 拿到第一个元素。bucket 存放 jobid 和时间戳
135170
bucketItem, err := getFromBucket(bucketName)
136171
if err != nil {
137172
log.Printf("扫描bucket错误#bucket-%s#%s", bucketName, err.Error())
@@ -165,11 +200,12 @@ func tickHandler(t time.Time, bucketName string) {
165200
if job.Delay > t.Unix() {
166201
// 从bucket中删除旧的jobId
167202
removeFromBucket(bucketName, bucketItem.jobId)
168-
// 重新计算delay时间并放入bucket中
203+
// 重新计算delay时间并放入其他的 bucket 中
169204
pushToBucket(<-bucketNameChan, job.Delay, bucketItem.jobId)
170205
continue
171206
}
172207

208+
// 放到 Ready 队列中,普通的 redis list 即可。RPUSH 方式
173209
err = pushToReadyQueue(job.Topic, bucketItem.jobId)
174210
if err != nil {
175211
log.Printf("JobId放入ready queue失败#bucket-%s#job-%+v#%s",

delayqueue/job.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
package delayqueue
22

33
import (
4+
"github.com/php-cpm/delay-queue/config"
45
"github.com/vmihailenco/msgpack"
56
)
67

78
// Job 使用msgpack序列化后保存到Redis,减少内存占用
89
type Job struct {
9-
Topic string `json:"topic" msgpack:"1"`
10-
Id string `json:"id" msgpack:"2"` // job唯一标识ID
10+
Topic string `json:"topic" msgpack:"1"` // topic ,唯一
11+
Id string `json:"id" msgpack:"2"` // job唯一标识ID 。客户端需要保证唯一性。有关联关系的
1112
Delay int64 `json:"delay" msgpack:"3"` // 延迟时间, unix时间戳
12-
TTR int64 `json:"ttr" msgpack:"4"`
13-
Body string `json:"body" msgpack:"5"`
13+
TTR int64 `json:"ttr" msgpack:"4"` // 超时时间,TTR的设计目的是为了保证消息传输的可靠性。
14+
Body string `json:"body" msgpack:"5"`// body
1415
}
1516

1617
// 获取Job
1718
func getJob(key string) (*Job, error) {
18-
value, err := execRedisCommand("GET", key)
19+
value, err := execRedisCommand("GET", config.DefaultKeyName+key)
1920
if err != nil {
2021
return nil, err
2122
}
@@ -39,14 +40,14 @@ func putJob(key string, job Job) error {
3940
if err != nil {
4041
return err
4142
}
42-
_, err = execRedisCommand("SET", key, value)
43+
_, err = execRedisCommand("SET", config.DefaultKeyName+key, value)
4344

4445
return err
4546
}
4647

4748
// 删除Job
4849
func removeJob(key string) error {
49-
_, err := execRedisCommand("DEL", key)
50+
_, err := execRedisCommand("DEL", config.DefaultKeyName+key)
5051

5152
return err
5253
}

0 commit comments

Comments
 (0)