Skip to content

Commit 0b709c7

Browse files
authored
Merge pull request #2359 from go-redis/fix/pipeline-mutext
fix: remove mutex from pipeline
2 parents eeb49d3 + bc9216e commit 0b709c7

File tree

3 files changed

+8
-85
lines changed

3 files changed

+8
-85
lines changed

CHANGELOG.md

+5-22
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,17 @@
1-
# [9.0.0-rc.2](https://github.com/go-redis/redis/compare/v9.0.0-rc.1...v9.0.0-rc.2) (2022-11-26)
2-
3-
4-
### Bug Fixes
5-
6-
* capture error correctly in withConn ([d1bfaba](https://github.com/go-redis/redis/commit/d1bfaba549fe380d269c26cea0a0183ed1520a85))
7-
* fixes ring.SetAddrs and rebalance race ([#2283](https://github.com/go-redis/redis/issues/2283)) ([d83436b](https://github.com/go-redis/redis/commit/d83436b321cd9ed52ba33c3edbe8f63bb0444c59))
8-
* read in route_randomly query param correctly ([f236053](https://github.com/go-redis/redis/commit/f236053735d10aec5e6e31fc3ced1b2e53292554))
9-
* reduce `SetAddrs` shards lock contention ([6c05a9f](https://github.com/go-redis/redis/commit/6c05a9f6b17f8e32593d3f7d594f82ba3dbcafb1)), closes [/github.com/go-redis/redis/pull/2190#discussion_r953040289](https://github.com//github.com/go-redis/redis/pull/2190/issues/discussion_r953040289) [#2077](https://github.com/go-redis/redis/issues/2077)
10-
* wrap cmds in Conn.TxPipeline ([5053db2](https://github.com/go-redis/redis/commit/5053db2f9c8b3ca25f497a75f70012c7ad6cd775))
11-
12-
13-
### Features
14-
15-
* add HasErrorPrefix ([d3d8002](https://github.com/go-redis/redis/commit/d3d8002e894a1eab5bab2c9fff13439527e330d8))
16-
* add support for SINTERCARD command ([bc51c61](https://github.com/go-redis/redis/commit/bc51c61a458d1bc4fb4424c7c3e912325ef980cc))
17-
18-
19-
201
## v9 UNRELEASED
212

223
### Added
234

24-
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol.
25-
Contributed by @monkey92t who has done a lot of work recently.
5+
- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol. It was
6+
contributed by @monkey92t who has done the majority of work in this release.
267
- Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts
278
and deadlines. See
289
[Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details.
2910
- Added `ParseClusterURL` to parse URLs into `ClusterOptions`, for example,
3011
`redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`.
3112
- Added metrics instrumentation using `redisotel.IstrumentMetrics`. See
3213
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html)
14+
- Added `redis.HasErrorPrefix` to help working with errors.
3315

3416
### Changed
3517

@@ -48,8 +30,9 @@
4830
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and
4931
it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to
5032
reset commands for some reason.
33+
- Changed Pipelines to not be thread-safe any more.
5134

5235
### Fixed
5336

5437
- Improved and fixed pipeline retries.
55-
- As usual, added more commands and fixed some bugs.
38+
- As usually, added support for more commands and fixed some bugs.

pipeline.go

+3-16
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package redis
22

33
import (
44
"context"
5-
"sync"
65
)
76

87
type pipelineExecer func(context.Context, []Cmder) error
@@ -32,15 +31,13 @@ type Pipeliner interface {
3231
var _ Pipeliner = (*Pipeline)(nil)
3332

3433
// Pipeline implements pipelining as described in
35-
// http://redis.io/topics/pipelining. It's safe for concurrent use
36-
// by multiple goroutines.
34+
// http://redis.io/topics/pipelining.
35+
// Please note: it is not safe for concurrent use by multiple goroutines.
3736
type Pipeline struct {
3837
cmdable
3938
statefulCmdable
4039

4140
exec pipelineExecer
42-
43-
mu sync.Mutex
4441
cmds []Cmder
4542
}
4643

@@ -51,10 +48,7 @@ func (c *Pipeline) init() {
5148

5249
// Len returns the number of queued commands.
5350
func (c *Pipeline) Len() int {
54-
c.mu.Lock()
55-
ln := len(c.cmds)
56-
c.mu.Unlock()
57-
return ln
51+
return len(c.cmds)
5852
}
5953

6054
// Do queues the custom command for later execution.
@@ -66,17 +60,13 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
6660

6761
// Process queues the cmd for later execution.
6862
func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error {
69-
c.mu.Lock()
7063
c.cmds = append(c.cmds, cmd)
71-
c.mu.Unlock()
7264
return nil
7365
}
7466

7567
// Discard resets the pipeline and discards queued commands.
7668
func (c *Pipeline) Discard() {
77-
c.mu.Lock()
7869
c.cmds = c.cmds[:0]
79-
c.mu.Unlock()
8070
}
8171

8272
// Exec executes all previously queued commands using one
@@ -85,9 +75,6 @@ func (c *Pipeline) Discard() {
8575
// Exec always returns list of commands and error of the first failed
8676
// command if any.
8777
func (c *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
88-
c.mu.Lock()
89-
defer c.mu.Unlock()
90-
9178
if len(c.cmds) == 0 {
9279
return nil, nil
9380
}

race_test.go

-47
Original file line numberDiff line numberDiff line change
@@ -214,53 +214,6 @@ var _ = Describe("races", func() {
214214
Expect(val).To(Equal(int64(C * N)))
215215
})
216216

217-
It("should Pipeline", func() {
218-
perform(C, func(id int) {
219-
pipe := client.Pipeline()
220-
for i := 0; i < N; i++ {
221-
pipe.Echo(ctx, fmt.Sprint(i))
222-
}
223-
224-
cmds, err := pipe.Exec(ctx)
225-
Expect(err).NotTo(HaveOccurred())
226-
Expect(cmds).To(HaveLen(N))
227-
228-
for i := 0; i < N; i++ {
229-
Expect(cmds[i].(*redis.StringCmd).Val()).To(Equal(fmt.Sprint(i)))
230-
}
231-
})
232-
})
233-
234-
It("should Pipeline", func() {
235-
pipe := client.Pipeline()
236-
perform(N, func(id int) {
237-
pipe.Incr(ctx, "key")
238-
})
239-
240-
cmds, err := pipe.Exec(ctx)
241-
Expect(err).NotTo(HaveOccurred())
242-
Expect(cmds).To(HaveLen(N))
243-
244-
n, err := client.Get(ctx, "key").Int64()
245-
Expect(err).NotTo(HaveOccurred())
246-
Expect(n).To(Equal(int64(N)))
247-
})
248-
249-
It("should TxPipeline", func() {
250-
pipe := client.TxPipeline()
251-
perform(N, func(id int) {
252-
pipe.Incr(ctx, "key")
253-
})
254-
255-
cmds, err := pipe.Exec(ctx)
256-
Expect(err).NotTo(HaveOccurred())
257-
Expect(cmds).To(HaveLen(N))
258-
259-
n, err := client.Get(ctx, "key").Int64()
260-
Expect(err).NotTo(HaveOccurred())
261-
Expect(n).To(Equal(int64(N)))
262-
})
263-
264217
PIt("should BLPop", func() {
265218
var received uint32
266219

0 commit comments

Comments
 (0)