@@ -2,255 +2,50 @@ package mysql_storage
2
2
3
3
import (
4
4
"context"
5
- "errors"
6
- "fmt"
7
- "github.com/golang-infrastructure/go-iterator"
8
- "github.com/storage-lock/go-storage"
9
- storage_lock "github.com/storage-lock/go-storage-lock"
10
- "time"
11
-
12
5
_ "github.com/go-sql-driver/mysql"
6
+ sql_based_storage "github.com/storage-lock/go-sql-based-storage"
7
+ "github.com/storage-lock/go-storage"
13
8
)
14
9
15
10
type MySQLStorage struct {
16
- options * MySQLStorageOptions
17
- tableFullName string
11
+ * sql_based_storage. SqlBasedStorage
12
+ options * MySQLStorageOptions
18
13
}
19
14
20
15
var _ storage.Storage = & MySQLStorage {}
21
16
22
17
func NewMySQLStorage (ctx context.Context , options * MySQLStorageOptions ) (* MySQLStorage , error ) {
23
- s := & MySQLStorage {
24
- options : options ,
25
- }
26
18
27
- err := s . Init ( ctx )
28
- if err != nil {
19
+ // 参数检查
20
+ if err := options . Check (); err != nil {
29
21
return nil , err
30
22
}
31
23
32
- return s , nil
33
- }
34
-
35
- const StorageName = "mysql-storage"
36
-
37
- func (x * MySQLStorage ) GetName () string {
38
- return StorageName
39
- }
40
-
41
- func (x * MySQLStorage ) Init (ctx context.Context ) (returnError error ) {
42
- db , err := x .options .ConnectionManager .Take (ctx )
43
- if err != nil {
44
- return err
45
- }
46
- defer func () {
47
- err := x .options .ConnectionManager .Return (ctx , db )
48
- if returnError == nil {
49
- returnError = err
50
- }
51
- }()
52
-
53
- // TODO 要不要自动创建数据库呢?这是一个值得讨论的问题。
54
- // 用户有可能是想把数据库连接放到当前的数据库下,也可能是想放到别的数据库下
55
- // 如果想放到别的数据库下,用户应该为其创建专门的数据库
56
- // 如果是复用连接的话,则有可能会有需求是切换数据库,也许这里只应该标记一下,作为能够用之后的优化项
57
-
58
- // 创建存储锁信息需要的表
59
- // TODO 这个参数后面涉及到多处拼接sql,可能会有sql注入,是否需要做一些安全措施?
60
- tableFullName := x .options .TableName
61
- if tableFullName == "" {
62
- tableFullName = fmt .Sprintf ("`%s`.`%s`" , storage .DefaultStorageDatabaseName , storage .DefaultStorageTableName )
63
- }
64
- createTableSql := `CREATE TABLE IF NOT EXISTS %s (
65
- lock_id VARCHAR(255) NOT NULL PRIMARY KEY,
66
- owner_id VARCHAR(255) NOT NULL,
67
- version BIGINT NOT NULL,
68
- lock_information_json_string VARCHAR(255) NOT NULL
69
- )`
70
- _ , err = db .Exec (fmt .Sprintf (createTableSql , tableFullName ))
71
- if err != nil {
72
- return err
73
- }
74
-
75
- x .tableFullName = tableFullName
76
-
77
- return nil
78
- }
79
-
80
- func (x * MySQLStorage ) UpdateWithVersion (ctx context.Context , lockId string , exceptedVersion , newVersion storage.Version , lockInformation * storage.LockInformation ) (returnError error ) {
81
-
82
- db , err := x .options .ConnectionManager .Take (ctx )
83
- if err != nil {
84
- return err
85
- }
86
- defer func () {
87
- err := x .options .ConnectionManager .Return (ctx , db )
88
- if returnError == nil {
89
- returnError = err
90
- }
91
- }()
92
-
93
- insertSql := fmt .Sprintf (`UPDATE %s SET version = ?, lock_information_json_string = ? WHERE lock_id = ? AND owner_id = ? AND version = ?` , x .tableFullName )
94
- execContext , err := db .ExecContext (ctx , insertSql , newVersion , lockInformation .ToJsonString (), lockId , lockInformation .OwnerId , exceptedVersion )
95
- if err != nil {
96
- return err
97
- }
98
- affected , err := execContext .RowsAffected ()
99
- if err != nil {
100
- return err
101
- }
102
- if affected == 0 {
103
- return storage_lock .ErrVersionMiss
104
- }
105
- return nil
106
- }
107
-
108
- func (x * MySQLStorage ) CreateWithVersion (ctx context.Context , lockId string , version storage.Version , lockInformation * storage.LockInformation ) (returnError error ) {
109
-
110
- db , err := x .options .ConnectionManager .Take (ctx )
111
- if err != nil {
112
- return err
113
- }
114
- defer func () {
115
- err := x .options .ConnectionManager .Return (ctx , db )
116
- if returnError == nil {
117
- returnError = err
118
- }
119
- }()
120
-
121
- insertSql := fmt .Sprintf (`INSERT INTO %s (lock_id, owner_id, version, lock_information_json_string) VALUES (?, ?, ?, ?)` , x .tableFullName )
122
- execContext , err := db .ExecContext (ctx , insertSql , lockId , lockInformation .OwnerId , version , lockInformation .ToJsonString ())
123
- if err != nil {
124
- return err
125
- }
126
- affected , err := execContext .RowsAffected ()
127
- if err != nil {
128
- return err
129
- }
130
- if affected == 0 {
131
- return storage_lock .ErrVersionMiss
132
- }
133
- return nil
134
- }
135
-
136
- func (x * MySQLStorage ) DeleteWithVersion (ctx context.Context , lockId string , exceptedVersion storage.Version , lockInformation * storage.LockInformation ) (returnError error ) {
137
-
138
- db , err := x .options .ConnectionManager .Take (ctx )
139
- if err != nil {
140
- return err
141
- }
142
- defer func () {
143
- err := x .options .ConnectionManager .Return (ctx , db )
144
- if returnError == nil {
145
- returnError = err
146
- }
147
- }()
148
-
149
- deleteSql := fmt .Sprintf (`DELETE FROM %s WHERE lock_id = ? AND owner_id = ? AND version = ?` , x .tableFullName )
150
- execContext , err := db .ExecContext (ctx , deleteSql , lockId , lockInformation .OwnerId , exceptedVersion )
151
- if err != nil {
152
- return err
153
- }
154
- affected , err := execContext .RowsAffected ()
155
- if err != nil {
156
- return err
157
- }
158
- if affected == 0 {
159
- return storage_lock .ErrVersionMiss
160
- }
161
- return nil
162
- }
163
-
164
- func (x * MySQLStorage ) Get (ctx context.Context , lockId string ) (lockInformationJsonString string , returnError error ) {
165
-
166
- db , err := x .options .ConnectionManager .Take (ctx )
167
- if err != nil {
168
- return "" , err
169
- }
170
- defer func () {
171
- err := x .options .ConnectionManager .Return (ctx , db )
172
- if returnError == nil {
173
- returnError = err
174
- }
175
- }()
176
-
177
- getLockSql := fmt .Sprintf ("SELECT lock_information_json_string FROM %s WHERE lock_id = ?" , x .tableFullName )
178
- rs , err := db .Query (getLockSql , lockId )
179
- if err != nil {
180
- return "" , err
181
- }
182
- defer func () {
183
- _ = rs .Close ()
184
- }()
185
- if ! rs .Next () {
186
- return "" , storage_lock .ErrLockNotFound
187
- }
188
- err = rs .Scan (& lockInformationJsonString )
24
+ // sql storage的基础Storage
25
+ baseStorageOption := sql_based_storage .NewSqlBasedStorageOptions ().
26
+ SetConnectionManager (options .ConnectionManager ).
27
+ SetSqlProvider (sql_based_storage .NewSql97Provider ()).
28
+ SetTableFullName (options .TableName )
29
+ baseStorage , err := sql_based_storage .NewSqlBasedStorage (baseStorageOption )
189
30
if err != nil {
190
- return "" , err
31
+ return nil , err
191
32
}
192
- return lockInformationJsonString , nil
193
- }
194
-
195
- func (x * MySQLStorage ) GetTime (ctx context.Context ) (now time.Time , returnError error ) {
196
33
197
- db , err := x . options . ConnectionManager . Take ( ctx )
198
- if err != nil {
199
- return time. Time {}, err
34
+ s := & MySQLStorage {
35
+ SqlBasedStorage : baseStorage ,
36
+ options : options ,
200
37
}
201
- defer func () {
202
- err := x .options .ConnectionManager .Return (ctx , db )
203
- if returnError == nil {
204
- returnError = err
205
- }
206
- }()
207
38
208
- var zero time.Time
209
- // TODO 多实例的情况下可能会有问题,允许其能够比较方便的切换到NTP TimeProvider
210
- rs , err := db .Query ("SELECT UNIX_TIMESTAMP(NOW())" )
39
+ err = s .Init (ctx )
211
40
if err != nil {
212
- return zero , err
213
- }
214
- defer func () {
215
- err := rs .Close ()
216
- if returnError == nil {
217
- returnError = err
218
- }
219
- }()
220
- if ! rs .Next () {
221
- return zero , errors .New ("rs server time failed" )
222
- }
223
- var databaseTimestamp uint64
224
- err = rs .Scan (& databaseTimestamp )
225
- if err != nil {
226
- return zero , err
41
+ return nil , err
227
42
}
228
43
229
- // TODO 时区
230
- return time .Unix (int64 (databaseTimestamp ), 0 ), nil
231
- }
232
-
233
- func (x * MySQLStorage ) Close (ctx context.Context ) error {
234
- // 没有Storage级别的资源好回收的
235
- return nil
44
+ return s , nil
236
45
}
237
46
238
- func (x * MySQLStorage ) List (ctx context.Context ) (iterator iterator.Iterator [* storage.LockInformation ], returnError error ) {
239
-
240
- db , err := x .options .ConnectionManager .Take (ctx )
241
- if err != nil {
242
- return nil , err
243
- }
244
- defer func () {
245
- err := x .options .ConnectionManager .Return (ctx , db )
246
- if returnError == nil {
247
- returnError = err
248
- }
249
- }()
47
+ const StorageName = "mysql-storage"
250
48
251
- rows , err := db .Query ("SELECT * FROM %s" , x .tableFullName )
252
- if err != nil {
253
- return nil , err
254
- }
255
- return storage .NewSqlRowsIterator (rows ), nil
49
+ func (x * MySQLStorage ) GetName () string {
50
+ return StorageName
256
51
}
0 commit comments