Skip to content

Implement RedisClient::Cluster#with (again!) #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Metrics/ClassLength:

Metrics/ModuleLength:
Max: 500
Exclude:
- 'test/**/*'

Metrics/MethodLength:
Max: 50
Expand Down
85 changes: 85 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,91 @@ cli.call('MGET', '{key}1', '{key}2', '{key}3')
#=> [nil, nil, nil]
```

## Transactions
This gem supports [Redis transactions](https://redis.io/topics/transactions), including atomicity with `MULTI`/`EXEC`,
and conditional execution with `WATCH`. Redis does not support cross-node transactions, so all keys used within a
transaction must live in the same key slot. To use transactions, you must thus "pin" your client to a single connection using
`#with`. You can pass a single key, in order to perform multiple operations atomically on the same key, like so:

```ruby
cli.with(key: 'my_cool_key') do |conn|
conn.multi do |m|
m.call('INC', 'my_cool_key')
m.call('INC', 'my_cool_key')
end
# my_cool_key will be incremented by 2, with no intermediate state visible to other clients
end
```

More commonly, however, you will want to perform transactions across multiple keys. To do this, you need to ensure that all keys used in the transaction hash to the same slot; Redis a mechanism called [hashtags](https://redis.io/docs/reference/cluster-spec/#hash-tags) to achieve this. If a key contains a hashag (e.g. in the key `{foo}bar`, the hashtag is `foo`), then it is guaranted to hash to the same slot (and thus always live on the same node) as other keys which contain the same hashtag.

So, whilst it's not possible in Redis cluster to perform a transction on the keys `foo` and `bar`, it _is_ possible to perform a transaction on the keys `{tag}foo` and `{tag}bar`. To perform such transactions on this gem, pass `hashtag:` to `#with` instead of `key`:

```ruby
cli.with(hashtag: 'user123') do |conn|
# You can use any key which contains "{user123}" in this block
conn.multi do |m|
m.call('INC', '{user123}coins_spent')
m.call('DEC', '{user123}coins_available')
end
end
```

Once you have pinned a client to a particular slot, you can use the same transaction APIs as the
[redis-client](https://github.com/redis-rb/redis-client#usage) gem allows.
```ruby
# No concurrent client will ever see the value 1 in 'mykey'; it will see either zero or two.
cli.call('SET', 'key', 0)
cli.with(key: 'key') do |conn|
conn.multi do |txn|
txn.call('INCR', 'key')
txn.call('INCR', 'key')
end
#=> ['OK', 'OK']
end
# Conditional execution with WATCH can be used to e.g. atomically swap two keys
cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2')
cli.with(hashtag: 'myslot') do |conn|
conn.call('WATCH', '{myslot}1', '{myslot}2')
conn.multi do |txn|
old_key1 = conn.call('GET', '{myslot}1')
old_key2 = conn.call('GET', '{myslot}2')
txn.call('SET', '{myslot}1', old_key2)
txn.call('SET', '{myslot}2', old_key1)
end
# This transaction will swap the values of {myslot}1 and {myslot}2 only if no concurrent connection modified
# either of the values
end
# You can also pass watch: to #multi as a shortcut
cli.call('MSET', '{myslot}1', 'v1', '{myslot}2', 'v2')
cli.with(hashtag: 'myslot') do |conn|
conn.multi(watch: ['{myslot}1', '{myslot}2']) do |txn|
old_key1, old_key2 = conn.call('MGET', '{myslot}1', '{myslot}2')
txn.call('MSET', '{myslot}1', old_key2, '{myslot}2', old_key1)
end
end
```

Pinned connections are aware of redirections and node failures like ordinary calls to `RedisClient::Cluster`, but because
you may have written non-idempotent code inside your block, the block is not automatically retried if e.g. the slot
it is operating on moves to a different node. If you want this, you can opt-in to retries by passing nonzero
`retry_count` to `#with`.
```ruby
cli.with(hashtag: 'myslot', retry_count: 1) do |conn|
conn.call('GET', '{myslot}1')
#=> "value1"
# Now, some changes in cluster topology mean that {key} is moved to a different node!
conn.call('GET', '{myslot}2')
#=> MOVED 9039 127.0.0.1:16381 (RedisClient::CommandError)
# Luckily, the block will get retried (once) and so both GETs will be re-executed on the newly-discovered
# correct node.
end
```

Because `RedisClient` from the redis-client gem implements `#with` as simply `yield self` and ignores all of its
arguments, it's possible to write code which is compatible with both redis-client and redis-cluster-client; the `#with`
call will pin the connection to a slot when using clustering, or be a no-op when not.

## ACL
The cluster client internally calls [COMMAND](https://redis.io/commands/command/) and [CLUSTER NODES](https://redis.io/commands/cluster-nodes/) commands to operate correctly.
So please permit it like the followings.
Expand Down
24 changes: 24 additions & 0 deletions lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ def multi(watch: nil, &block)
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
end

def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)
key = process_with_arguments(key, hashtag)

node_key = @router.find_node_key_by_key(key, primary: write)
node = @router.find_node(node_key)
# Calling #with checks out the underlying connection if this is a pooled connection
# Calling it through #try_delegate ensures we handle any redirections and retry the entire
# transaction if so.
@router.try_delegate(node, :with, retry_count: retry_count, &block)
end

def pubsub
::RedisClient::Cluster::PubSub.new(@router, @command_builder)
end
Expand All @@ -105,6 +116,19 @@ def close

private

def process_with_arguments(key, hashtag) # rubocop:disable Metrics/CyclomaticComplexity
raise ArgumentError, 'Only one of key or hashtag may be provided' if key && hashtag

if hashtag
# The documentation says not to wrap your hashtag in {}, but people will probably
# do it anyway and it's easy for us to fix here.
key = hashtag&.match?(/^{.*}$/) ? hashtag : "{#{hashtag}}"
end
raise ArgumentError, 'One of key or hashtag must be provided' if key.nil? || key.empty?

key
end

def method_missing(name, *args, **kwargs, &block)
if @router.command_exists?(name)
args.unshift(name)
Expand Down
Loading