Skip to content

Commit 3b5adf4

Browse files
author
KJ Tsanaktsidis
committed
Implement RedisClient::Cluster#with
This PR actually implements the #with method for checking out a connection directly to a particular Redis node. This allows you to perform transactional commands directly against the underlying `RedisClient` instance, so `#watch`, `#multi`, etc all work. This does NOT yet implement any kind of validation for the commands issued. If you issue commands involving keys not owned by the node, you will receive `Redis::CommandError`'s. A future PR will add client-side validation to hopefully make it possible to catch problems related to transaction misuse earlier in development. There's a pending test for this. This PR also does not yet implement `RedisClient::Cluster#multi` in terms of `#with`. That will also come in a future PR.
1 parent 9bcc2ac commit 3b5adf4

File tree

3 files changed

+290
-0
lines changed

3 files changed

+290
-0
lines changed

README.md

+78
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,84 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3')
168168
#=> [nil, nil, nil]
169169
```
170170

171+
## Transactions
172+
This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`,
173+
and conditional execution with `WATCH`. Redis does not support cross-slot transactions, so all keys used within a
174+
transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single slot using
175+
`#with`, specifying the slot by providing a key which hashes to that slot in the `slot_key` argument:
176+
177+
```ruby
178+
cli.with(slot_key: '{key}1') do |conn|
179+
conn.call('SET', '{key}1', 'This is OK')
180+
#=> "OK"
181+
conn.call('SET', '{key}2', 'This is also OK; it has the same hashtag, and so the same slot')
182+
#=> "OK"
183+
end
184+
end
185+
```
186+
187+
When using hash tags to enforce same-slot key hashing, it's often neat to simply pass the hashtag _only_ to `#with`:
188+
```ruby
189+
cli.with(slot_key: '{myslot}') do |conn|
190+
# You can use any key starting with {myslot} in this block
191+
end
192+
```
193+
194+
Once you have pinned a client to a particular slot, you can use the same transaction APIs as the
195+
[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows.
196+
```ruby
197+
# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two.
198+
cli.call('SET', 'key', 0)
199+
cli.with(slot_key: 'key') do |conn|
200+
conn.multi do |txn|
201+
txn.call('INCR', 'key')
202+
txn.call('INCR', 'key')
203+
end
204+
#=> ['OK', 'OK']
205+
end
206+
# Conditional execution with WATCH can be used to e.g. atomically swap two keys
207+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
208+
cli.with(slot_key: '{key}') do |conn|
209+
conn.call('WATCH', '{key}1', '{key}2')
210+
conn.multi do |txn|
211+
old_key1 = conn.call('GET', '{key}1')
212+
old_key2 = conn.call('GET', '{key}2')
213+
txn.call('SET', '{key}1', old_key2)
214+
txn.call('SET', '{key}2', old_key1)
215+
end
216+
# This transaction will swap the values of {key}1 and {key}2 only if no concurrent connection modified
217+
# either of the values
218+
end
219+
# You can also pass watch: to #multi as a shortcut
220+
cli.call('MSET', '{key}1', 'v1', '{key}2', 'v2')
221+
cli.with(slot_key: '{key}') do |conn|
222+
conn.multi(watch: ['{key}1', '{key}2']) do |txn|
223+
old_key1, old_key2 = conn.call('MGET', '{key}1', '{key}2')
224+
txn.call('MSET', '{key}1', old_key2, '{key}2', old_key1)
225+
end
226+
end
227+
```
228+
229+
Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because
230+
you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot
231+
it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero
232+
`retry_count` to `#with`.
233+
```ruby
234+
cli.with(slot_key: '{key}', retry_count: 1) do |conn|
235+
conn.call('GET', '{key}1')
236+
#=> "value1"
237+
# Now, some changes in cluster topology mean that {key} is moved to a different node!
238+
conn.call('GET', '{key}2')
239+
#=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError)
240+
# Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered
241+
# correct node.
242+
end
243+
```
244+
245+
Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
246+
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
247+
call will pin the connection to a slot when using clustering, or be a no-op when not.
248+
171249
## ACL
172250
The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly.
173251
So please permit it like the followings.

lib/redis_client/cluster.rb

+11
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,17 @@ def multi(watch: nil, &block)
9393
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
9494
end
9595

96+
def with(slot_key:, write: true, retry_count: 0, &block)
97+
raise ArgumentError, 'slot_key must be provided' if slot_key.nil? || slot_key.empty?
98+
99+
node_key = @router.find_node_key_by_key(slot_key, primary: write)
100+
node = @router.find_node(node_key)
101+
# Calling #with checks out the underlying connection if this is a pooled connection
102+
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
103+
# transaction if so.
104+
@router.try_delegate(node, :with, retry_count: retry_count, &block)
105+
end
106+
96107
def pubsub
97108
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
98109
end

test/redis_client/test_cluster.rb

+201
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,207 @@ def test_only_reshards_own_errors
503503
assert_equal correct_primary_key, router.find_node_key_by_key('testkey', primary: true)
504504
end
505505

506+
def test_pinning_single_key
507+
got = @client.with(slot_key: 'key1') do |conn|
508+
conn.call('SET', 'key1', 'hello')
509+
conn.call('GET', 'key1')
510+
end
511+
assert_equal('hello', got)
512+
end
513+
514+
def test_pinning_no_key
515+
assert_raises(ArgumentError) do
516+
@client.with(slot_key: nil) {}
517+
end
518+
end
519+
520+
def test_pinning_two_keys
521+
got = @client.with(slot_key: '{slot}') do |conn|
522+
conn.call('SET', '{slot}key1', 'v1')
523+
conn.call('SET', '{slot}key2', 'v2')
524+
conn.call('MGET', '{slot}key1', '{slot}key2')
525+
end
526+
assert_equal(%w[v1 v2], got)
527+
end
528+
529+
def test_pinning_cross_slot
530+
skip 'This is not implemented yet!'
531+
532+
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
533+
@client.with(slot_key: '{slot1}') do |conn|
534+
conn.call('GET', '{slot2}')
535+
end
536+
end
537+
end
538+
539+
def test_pinning_pipeline
540+
got = @client.with(slot_key: '{slot}') do |conn|
541+
conn.call_v(['SET', '{slot}counter', 0])
542+
conn.pipelined do |pipe|
543+
pipe.call_v(['INCR', '{slot}counter'])
544+
pipe.call_v(['INCR', '{slot}counter'])
545+
pipe.call_v(['INCR', '{slot}counter'])
546+
end
547+
conn.call_v(['GET', '{slot}counter']).to_i
548+
end
549+
550+
assert_equal(3, got)
551+
end
552+
553+
def test_pinning_pipeline_with_error
554+
assert_raises(RedisClient::CommandError) do
555+
@client.with(slot_key: '{slot}') do |conn|
556+
conn.pipelined do |pipeline|
557+
pipeline.call('SET', '{slot}key', 'first')
558+
pipeline.call('SET', '{slot}key', 'second', 'too many args')
559+
pipeline.call('SET', '{slot}key', 'third')
560+
end
561+
end
562+
end
563+
564+
wait_for_replication
565+
assert_equal('third', @client.call('GET', '{slot}key'))
566+
end
567+
568+
def test_pinning_transaction
569+
got = @client.with(slot_key: '{slot}') do |conn|
570+
conn.multi do |txn|
571+
txn.call('SET', '{slot}key1', 'value1')
572+
txn.call('SET', '{slot}key2', 'value2')
573+
end
574+
end
575+
576+
assert_equal(%w[OK OK], got)
577+
end
578+
579+
def test_pinning_transaction_watch_arg
580+
@client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2')
581+
@captured_commands.clear
582+
583+
got = @client.with(slot_key: '{slot}') do |conn|
584+
conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn|
585+
old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2')
586+
txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1)
587+
end
588+
end
589+
590+
assert_equal([
591+
%w[WATCH {slot}key1 {slot}key2],
592+
%w[MGET {slot}key1 {slot}key2],
593+
%w[MULTI],
594+
%w[MSET {slot}key1 val2 {slot}key2 val1],
595+
%w[EXEC]
596+
], @captured_commands.to_a.map(&:command))
597+
598+
wait_for_replication
599+
assert_equal(['OK'], got)
600+
assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2'))
601+
end
602+
603+
def test_pinning_transaction_watch_arg_unwatches_on_raise
604+
ex = Class.new(StandardError)
605+
@captured_commands.clear
606+
607+
assert_raises(ex) do
608+
@client.with(slot_key: '{slot}') do |conn|
609+
conn.multi(watch: ['{slot}key1']) do |_txn|
610+
conn.call('GET', '{slot}key1')
611+
raise ex, 'boom'
612+
end
613+
end
614+
end
615+
616+
assert_equal([
617+
%w[WATCH {slot}key1],
618+
%w[GET {slot}key1],
619+
%w[UNWATCH]
620+
], @captured_commands.to_a.map(&:command))
621+
end
622+
623+
def test_pinning_transaction_can_watch_manually
624+
@client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2')
625+
@captured_commands.clear
626+
627+
got = @client.with(slot_key: '{slot}') do |conn|
628+
conn.call('WATCH', '{slot}key1', '{slot}key2')
629+
old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2')
630+
conn.multi do |txn|
631+
txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1)
632+
end
633+
end
634+
635+
assert_equal([
636+
%w[WATCH {slot}key1 {slot}key2],
637+
%w[MGET {slot}key1 {slot}key2],
638+
%w[MULTI],
639+
%w[MSET {slot}key1 val2 {slot}key2 val1],
640+
%w[EXEC]
641+
], @captured_commands.to_a.map(&:command))
642+
643+
wait_for_replication
644+
assert_equal(['OK'], got)
645+
assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2'))
646+
end
647+
648+
def test_pinning_transaction_can_unwatch_manually
649+
got = @client.with(slot_key: '{slot}') do |conn|
650+
conn.call('WATCH', '{slot}key1')
651+
conn.call('UNWATCH')
652+
end
653+
654+
assert_equal('OK', got)
655+
end
656+
657+
def test_pinning_timeouts_update_topology
658+
# Create a new test client with a lower timeout for this test so it's fast.
659+
captured_commands = CommandCaptureMiddleware::CommandBuffer.new
660+
client = new_test_client(capture_buffer: captured_commands, timeout: 0.5)
661+
662+
client.call('DEL', '{slot}list')
663+
captured_commands.clear
664+
665+
assert_raises(::RedisClient::ReadTimeoutError) do
666+
client.with(slot_key: '{slot}') do |conn|
667+
conn.call_v(['BLPOP', '{slot}list', 0])
668+
end
669+
end
670+
assert_includes(captured_commands.to_a.map(&:command), %w[CLUSTER NODES])
671+
end
672+
673+
def test_pinning_sscan
674+
@client.call('DEL', '{slot}set')
675+
expected_set = Set.new
676+
scanned_set = Set.new
677+
1000.times do |i|
678+
expected_set << i
679+
@client.call('SADD', '{slot}set', i)
680+
end
681+
@client.with(slot_key: '{slot}') do |conn|
682+
conn.sscan('{slot}set') do |i|
683+
scanned_set << i.to_i
684+
end
685+
end
686+
687+
assert_equal(expected_set, scanned_set)
688+
end
689+
690+
def test_pinning_zscan
691+
@client.call('DEL', '{slot}set')
692+
expected_set = Set.new
693+
scanned_set = Set.new
694+
1000.times do |i|
695+
expected_set << "member#{i}"
696+
@client.call('ZADD', '{slot}set', i, "member#{i}")
697+
end
698+
@client.with(slot_key: '{slot}') do |conn|
699+
conn.zscan('{slot}set') do |i|
700+
scanned_set << i
701+
end
702+
end
703+
704+
assert_equal(expected_set, scanned_set)
705+
end
706+
506707
private
507708

508709
def wait_for_replication

0 commit comments

Comments
 (0)