diff --git a/.rubocop.yml b/.rubocop.yml index 6d1f5936..a81474ce 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,6 +19,8 @@ Metrics/ClassLength: Metrics/ModuleLength: Max: 500 + Exclude: + - 'test/**/*' Metrics/MethodLength: Max: 50 diff --git a/README.md b/README.md index 6bda4aaf..f9d7e34d 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,91 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3') #=> [nil, nil, nil] ``` +## Transactions +This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`, +and conditional execution with `WATCH`. Redis does not support cross-node transactions, so all keys used within a +transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single connection using +`#with`. You can pass a single key, in order to perform multiple operations atomically on the same key, like so: + +```ruby +cli.with(key: 'my_cool_key') do |conn| + conn.multi do |m| + m.call('INC', 'my_cool_key') + m.call('INC', 'my_cool_key') + end + # my_cool_key will be incremented by 2, with no intermediate state visible to other clients +end +``` + +More commonly, however, you will want to perform transactions across multiple keys. To do this, you need to ensure that all keys used in the transaction hash to the same slot; Redis a mechanism called [hashtags](https://redis.io/docs/reference/cluster-spec/#hash-tags) to achieve this. If a key contains a hashag (e.g. in the key `{foo}bar`, the hashtag is `foo`), then it is guaranted to hash to the same slot (and thus always live on the same node) as other keys which contain the same hashtag. + +So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`, it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`. To perform such transactions on this gem, pass `hashtag:` to `#with` instead of `key`: + +```ruby +cli.with(hashtag: 'user123') do |conn| + # You can use any key which contains "{user123}" in this block + conn.multi do |m| + m.call('INC', '{user123}coins_spent') + m.call('DEC', '{user123}coins_available') + end +end +``` + +Once you have pinned a client to a particular slot, you can use the same transaction APIs as the +[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows. +```ruby +# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two. +cli.call('SET', 'key', 0) +cli.with(key: 'key') do |conn| + conn.multi do |txn| + txn.call('INCR', 'key') + txn.call('INCR', 'key') + end + #=> ['OK', 'OK'] +end +# Conditional execution with WATCH can be used to e.g. atomically swap two keys +cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2') +cli.with(hashtag: 'myslot') do |conn| + conn.call('WATCH', '{myslot}1', '{myslot}2') + conn.multi do |txn| + old_key1 = conn.call('GET', '{myslot}1') + old_key2 = conn.call('GET', '{myslot}2') + txn.call('SET', '{myslot}1', old_key2) + txn.call('SET', '{myslot}2', old_key1) + end + # This transaction will swap the values of {myslot}1 and {myslot}2 only if no concurrent connection modified + # either of the values +end +# You can also pass watch: to #multi as a shortcut +cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2') +cli.with(hashtag: 'myslot') do |conn| + conn.multi(watch: ['{myslot}1', '{myslot}2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{myslot}1', '{myslot}2') + txn.call('MSET', '{myslot}1', old_key2, '{myslot}2', old_key1) + end +end +``` + +Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because +you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot +it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero +`retry_count` to `#with`. +```ruby +cli.with(hashtag: 'myslot', retry_count: 1) do |conn| + conn.call('GET', '{myslot}1') + #=> "value1" + # Now, some changes in cluster topology mean that {key} is moved to a different node! + conn.call('GET', '{myslot}2') + #=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError) + # Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered + # correct node. +end +``` + +Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its +arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with` +call will pin the connection to a slot when using clustering, or be a no-op when not. + ## ACL The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly. So please permit it like the followings. diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 0ea6eccd..02281105 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -93,6 +93,17 @@ def multi(watch: nil, &block) ::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block) end + def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block) + key = process_with_arguments(key, hashtag) + + node_key = @router.find_node_key_by_key(key, primary: write) + node = @router.find_node(node_key) + # Calling #with checks out the underlying connection if this is a pooled connection + # Calling it through #try_delegate ensures we handle any redirections and retry the entire + # transaction if so. + @router.try_delegate(node, :with, retry_count: retry_count, &block) + end + def pubsub ::RedisClient::Cluster::PubSub.new(@router, @command_builder) end @@ -105,6 +116,19 @@ def close private + def process_with_arguments(key, hashtag) # rubocop:disable Metrics/CyclomaticComplexity + raise ArgumentError, 'Only one of key or hashtag may be provided' if key && hashtag + + if hashtag + # The documentation says not to wrap your hashtag in {}, but people will probably + # do it anyway and it's easy for us to fix here. + key = hashtag&.match?(/^{.*}$/) ? hashtag : "{#{hashtag}}" + end + raise ArgumentError, 'One of key or hashtag must be provided' if key.nil? || key.empty? + + key + end + def method_missing(name, *args, **kwargs, &block) if @router.command_exists?(name) args.unshift(name) diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index bcad46f6..433a9af6 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -503,6 +503,222 @@ def test_only_reshards_own_errors assert_equal correct_primary_key, router.find_node_key_by_key('testkey', primary: true) end + def test_pinning_single_key + got = @client.with(key: 'key1') do |conn| + conn.call('SET', 'key1', 'hello') + conn.call('GET', 'key1') + end + assert_equal('hello', got) + end + + def test_pinning_no_key + assert_raises(ArgumentError) do + @client.with {} + end + end + + def test_pinning_empty_key + assert_raises(ArgumentError) do + @client.with(key: '') {} + end + end + + def test_pinning_two_keys + got = @client.with(hashtag: 'slot') do |conn| + conn.call('SET', '{slot}key1', 'v1') + conn.call('SET', '{slot}key2', 'v2') + conn.call('MGET', '{slot}key1', '{slot}key2') + end + assert_equal(%w[v1 v2], got) + end + + def test_pinning_cross_slot + skip 'This is not implemented yet!' + + assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do + @client.with(hashtag: 'slot1') do |conn| + conn.call('GET', '{slot2}') + end + end + end + + def test_pinning_hashtag_with_braces + got = @client.with(hashtag: '{slot}') do |conn| + conn.call('SET', '{slot}key1', 'v1') + conn.call('SET', '{slot}key2', 'v2') + conn.call('MGET', '{slot}key1', '{slot}key2') + end + assert_equal(%w[v1 v2], got) + end + + def test_pinning_pipeline + got = @client.with(hashtag: 'slot') do |conn| + conn.call_v(['SET', '{slot}counter', 0]) + conn.pipelined do |pipe| + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + pipe.call_v(['INCR', '{slot}counter']) + end + conn.call_v(['GET', '{slot}counter']).to_i + end + + assert_equal(3, got) + end + + def test_pinning_pipeline_with_error + assert_raises(RedisClient::CommandError) do + @client.with(hashtag: 'slot') do |conn| + conn.pipelined do |pipeline| + pipeline.call('SET', '{slot}key', 'first') + pipeline.call('SET', '{slot}key', 'second', 'too many args') + pipeline.call('SET', '{slot}key', 'third') + end + end + end + + wait_for_replication + assert_equal('third', @client.call('GET', '{slot}key')) + end + + def test_pinning_transaction + got = @client.with(hashtag: 'slot') do |conn| + conn.multi do |txn| + txn.call('SET', '{slot}key1', 'value1') + txn.call('SET', '{slot}key2', 'value2') + end + end + + assert_equal(%w[OK OK], got) + end + + def test_pinning_transaction_watch_arg + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(hashtag: 'slot') do |conn| + conn.multi(watch: ['{slot}key1', '{slot}key2']) do |txn| + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.to_a.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_watch_arg_unwatches_on_raise + ex = Class.new(StandardError) + @captured_commands.clear + + assert_raises(ex) do + @client.with(hashtag: 'slot') do |conn| + conn.multi(watch: ['{slot}key1']) do |_txn| + conn.call('GET', '{slot}key1') + raise ex, 'boom' + end + end + end + + assert_equal([ + %w[WATCH {slot}key1], + %w[GET {slot}key1], + %w[UNWATCH] + ], @captured_commands.to_a.map(&:command)) + end + + def test_pinning_transaction_can_watch_manually + @client.call('MSET', '{slot}key1', 'val1', '{slot}key2', 'val2') + @captured_commands.clear + + got = @client.with(hashtag: 'slot') do |conn| + conn.call('WATCH', '{slot}key1', '{slot}key2') + old_key1, old_key2 = conn.call('MGET', '{slot}key1', '{slot}key2') + conn.multi do |txn| + txn.call('MSET', '{slot}key1', old_key2, '{slot}key2', old_key1) + end + end + + assert_equal([ + %w[WATCH {slot}key1 {slot}key2], + %w[MGET {slot}key1 {slot}key2], + %w[MULTI], + %w[MSET {slot}key1 val2 {slot}key2 val1], + %w[EXEC] + ], @captured_commands.to_a.map(&:command)) + + wait_for_replication + assert_equal(['OK'], got) + assert_equal(%w[val2 val1], @client.call('MGET', '{slot}key1', '{slot}key2')) + end + + def test_pinning_transaction_can_unwatch_manually + got = @client.with(hashtag: 'slot') do |conn| + conn.call('WATCH', '{slot}key1') + conn.call('UNWATCH') + end + + assert_equal('OK', got) + end + + def test_pinning_timeouts_update_topology + # Create a new test client with a lower timeout for this test so it's fast. + captured_commands = CommandCaptureMiddleware::CommandBuffer.new + client = new_test_client(capture_buffer: captured_commands, timeout: 0.5) + + client.call('DEL', '{slot}list') + captured_commands.clear + + assert_raises(::RedisClient::ReadTimeoutError) do + client.with(hashtag: 'slot') do |conn| + conn.call_v(['BLPOP', '{slot}list', 0]) + end + end + assert_includes(captured_commands.to_a.map(&:command), %w[CLUSTER NODES]) + end + + def test_pinning_sscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << i + @client.call('SADD', '{slot}set', i) + end + @client.with(hashtag: 'slot') do |conn| + conn.sscan('{slot}set') do |i| + scanned_set << i.to_i + end + end + + assert_equal(expected_set, scanned_set) + end + + def test_pinning_zscan + @client.call('DEL', '{slot}set') + expected_set = Set.new + scanned_set = Set.new + 1000.times do |i| + expected_set << "member#{i}" + @client.call('ZADD', '{slot}set', i, "member#{i}") + end + @client.with(hashtag: 'slot') do |conn| + conn.zscan('{slot}set') do |i| + scanned_set << i + end + end + + assert_equal(expected_set, scanned_set) + end + private def wait_for_replication @@ -541,13 +757,14 @@ def hiredis_used? class PrimaryOnly < TestingWrapper include Mixin - def new_test_client(capture_buffer: @captured_commands) + def new_test_client(capture_buffer: @captured_commands, **opts) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, middlewares: [CommandCaptureMiddleware], custom: { captured_commands: capture_buffer }, - **TEST_GENERIC_OPTIONS + **TEST_GENERIC_OPTIONS, + **opts ) ::RedisClient::Cluster.new(config) end @@ -556,7 +773,7 @@ def new_test_client(capture_buffer: @captured_commands) class ScaleReadRandom < TestingWrapper include Mixin - def new_test_client(capture_buffer: @captured_commands) + def new_test_client(capture_buffer: @captured_commands, **opts) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -564,7 +781,8 @@ def new_test_client(capture_buffer: @captured_commands) fixed_hostname: TEST_FIXED_HOSTNAME, middlewares: [CommandCaptureMiddleware], custom: { captured_commands: capture_buffer }, - **TEST_GENERIC_OPTIONS + **TEST_GENERIC_OPTIONS, + **opts ) ::RedisClient::Cluster.new(config) end @@ -573,7 +791,7 @@ def new_test_client(capture_buffer: @captured_commands) class ScaleReadRandomWithPrimary < TestingWrapper include Mixin - def new_test_client(capture_buffer: @captured_commands) + def new_test_client(capture_buffer: @captured_commands, **opts) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -581,7 +799,8 @@ def new_test_client(capture_buffer: @captured_commands) fixed_hostname: TEST_FIXED_HOSTNAME, middlewares: [CommandCaptureMiddleware], custom: { captured_commands: capture_buffer }, - **TEST_GENERIC_OPTIONS + **TEST_GENERIC_OPTIONS, + **opts ) ::RedisClient::Cluster.new(config) end @@ -590,7 +809,7 @@ def new_test_client(capture_buffer: @captured_commands) class ScaleReadLatency < TestingWrapper include Mixin - def new_test_client(capture_buffer: @captured_commands) + def new_test_client(capture_buffer: @captured_commands, **opts) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, replica: true, @@ -598,7 +817,8 @@ def new_test_client(capture_buffer: @captured_commands) fixed_hostname: TEST_FIXED_HOSTNAME, middlewares: [CommandCaptureMiddleware], custom: { captured_commands: capture_buffer }, - **TEST_GENERIC_OPTIONS + **TEST_GENERIC_OPTIONS, + **opts ) ::RedisClient::Cluster.new(config) end @@ -607,13 +827,14 @@ def new_test_client(capture_buffer: @captured_commands) class Pooled < TestingWrapper include Mixin - def new_test_client(capture_buffer: @captured_commands) + def new_test_client(capture_buffer: @captured_commands, **opts) config = ::RedisClient::ClusterConfig.new( nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, middlewares: [CommandCaptureMiddleware], custom: { captured_commands: capture_buffer }, - **TEST_GENERIC_OPTIONS + **TEST_GENERIC_OPTIONS, + **opts ) ::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 }) end