Skip to content

Commit d1d5483

Browse files
authored
fix: don't call the block twice during the transaction (#318)
1 parent 86c0e01 commit d1d5483

File tree

5 files changed

+243
-24
lines changed

5 files changed

+243
-24
lines changed

lib/redis_client/cluster.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,10 @@ def pipelined
8989
pipeline.execute
9090
end
9191

92-
def multi(watch: nil, &block)
93-
::RedisClient::Cluster::Transaction.new(@router, @command_builder).execute(watch: watch, &block)
92+
def multi(watch: nil)
93+
transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder, watch)
94+
yield transaction
95+
transaction.execute
9496
end
9597

9698
def with(key: nil, hashtag: nil, write: true, retry_count: 0, &block)

lib/redis_client/cluster/router.rb

+5
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ def find_node_key_by_key(key, seed: nil, primary: false)
162162
end
163163
end
164164

165+
def find_primary_node_by_slot(slot)
166+
node_key = @node.find_node_key_of_primary(slot)
167+
find_node(node_key)
168+
end
169+
165170
def find_node_key(command, seed: nil)
166171
key = @command.extract_first_key(command)
167172
find_node_key_by_key(key, seed: seed, primary: @command.should_send_to_primary?(command))

lib/redis_client/cluster/transaction.rb

+140-20
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,176 @@
11
# frozen_string_literal: true
22

33
require 'redis_client'
4+
require 'redis_client/cluster/pipeline'
5+
require 'redis_client/cluster/key_slot_converter'
46

57
class RedisClient
68
class Cluster
79
class Transaction
810
ConsistencyError = Class.new(::RedisClient::Error)
911

10-
def initialize(router, command_builder)
12+
def initialize(router, command_builder, watch)
1113
@router = router
1214
@command_builder = command_builder
13-
@node_key = nil
15+
@watch = watch
16+
@retryable = true
17+
@pipeline = ::RedisClient::Pipeline.new(@command_builder)
18+
@buffer = []
19+
@node = nil
1420
end
1521

16-
def call(*command, **kwargs, &_)
22+
def call(*command, **kwargs, &block)
1723
command = @command_builder.generate(command, kwargs)
18-
ensure_node_key(command)
24+
if prepare(command)
25+
@pipeline.call_v(command, &block)
26+
else
27+
@buffer << -> { @pipeline.call_v(command, &block) }
28+
end
1929
end
2030

21-
def call_v(command, &_)
31+
def call_v(command, &block)
2232
command = @command_builder.generate(command)
23-
ensure_node_key(command)
33+
if prepare(command)
34+
@pipeline.call_v(command, &block)
35+
else
36+
@buffer << -> { @pipeline.call_v(command, &block) }
37+
end
2438
end
2539

26-
def call_once(*command, **kwargs, &_)
40+
def call_once(*command, **kwargs, &block)
41+
@retryable = false
2742
command = @command_builder.generate(command, kwargs)
28-
ensure_node_key(command)
43+
if prepare(command)
44+
@pipeline.call_once_v(command, &block)
45+
else
46+
@buffer << -> { @pipeline.call_once_v(command, &block) }
47+
end
2948
end
3049

31-
def call_once_v(command, &_)
50+
def call_once_v(command, &block)
51+
@retryable = false
3252
command = @command_builder.generate(command)
33-
ensure_node_key(command)
53+
if prepare(command)
54+
@pipeline.call_once_v(command, &block)
55+
else
56+
@buffer << -> { @pipeline.call_once_v(command, &block) }
57+
end
3458
end
3559

36-
def execute(watch: nil, &block)
37-
yield self
38-
raise ArgumentError, 'empty transaction' if @node_key.nil?
60+
def execute
61+
@buffer.each(&:call)
3962

40-
node = @router.find_node(@node_key)
41-
@router.try_delegate(node, :multi, watch: watch, &block)
63+
raise ArgumentError, 'empty transaction' if @pipeline._empty?
64+
raise ConsistencyError, "couldn't determine the node: #{@pipeline._commands}" if @node.nil?
65+
raise ConsistencyError, "unsafe watch: #{@watch.join(' ')}" unless safe_watch?
66+
67+
settle
4268
end
4369

4470
private
4571

46-
def ensure_node_key(command)
72+
def watch?
73+
!@watch.nil? && !@watch.empty?
74+
end
75+
76+
def safe_watch?
77+
return true unless watch?
78+
return false if @node.nil?
79+
80+
slots = @watch.map do |k|
81+
return false if k.nil? || k.empty?
82+
83+
::RedisClient::Cluster::KeySlotConverter.convert(k)
84+
end
85+
86+
return false if slots.uniq.size != 1
87+
88+
@router.find_primary_node_by_slot(slots.first) == @node
89+
end
90+
91+
def prepare(command)
92+
return true unless @node.nil?
93+
4794
node_key = @router.find_primary_node_key(command)
48-
raise ConsistencyError, "Client couldn't determine the node to be executed the transaction by: #{command}" if node_key.nil?
95+
return false if node_key.nil?
96+
97+
@node = @router.find_node(node_key)
98+
@pipeline.call('WATCH', *@watch) if watch?
99+
@pipeline.call('MULTI')
100+
@buffer.each(&:call)
101+
@buffer.clear
102+
true
103+
end
104+
105+
def settle
106+
@pipeline.call('EXEC')
107+
@pipeline.call('UNWATCH') if watch?
108+
send_transaction(@node, redirect: true)
109+
end
49110

50-
@node_key ||= node_key
51-
raise ConsistencyError, "The transaction should be done for single node: #{@node_key}, #{node_key}" if node_key != @node_key
111+
def send_transaction(client, redirect:)
112+
case client
113+
when ::RedisClient then send_pipeline(client, redirect: redirect)
114+
when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) }
115+
else raise NotImplementedError, "#{client.class.name}#multi for cluster client"
116+
end
117+
end
118+
119+
def send_pipeline(client, redirect:)
120+
results = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection|
121+
commands = @pipeline._commands
122+
client.middlewares.call_pipelined(commands, client.config) do
123+
connection.call_pipelined(commands, nil)
124+
rescue ::RedisClient::CommandError => e
125+
return handle_command_error!(commands, e) if redirect
126+
127+
raise
128+
end
129+
end
130+
131+
@pipeline._coerce!(results)
132+
results[watch? ? -2 : -1]
133+
end
134+
135+
def handle_command_error!(commands, err)
136+
if err.message.start_with?('CROSSSLOT')
137+
raise ConsistencyError, "#{err.message}: #{err.command}"
138+
elsif err.message.start_with?('MOVED', 'ASK')
139+
ensure_the_same_node!(commands)
140+
handle_redirection(err)
141+
else
142+
raise err
143+
end
144+
end
145+
146+
def ensure_the_same_node!(commands)
147+
commands.each do |command|
148+
node_key = @router.find_primary_node_key(command)
149+
next if node_key.nil?
150+
151+
node = @router.find_node(node_key)
152+
next if @node == node
153+
154+
raise ConsistencyError, "the transaction should be executed to a slot in a node: #{commands}"
155+
end
156+
end
157+
158+
def handle_redirection(err)
159+
if err.message.start_with?('MOVED')
160+
node = @router.assign_redirection_node(err.message)
161+
send_transaction(node, redirect: false)
162+
elsif err.message.start_with?('ASK')
163+
node = @router.assign_asking_node(err.message)
164+
try_asking(node) ? send_transaction(node, redirect: false) : err
165+
else
166+
raise err
167+
end
168+
end
52169

53-
nil
170+
def try_asking(node)
171+
node.call('ASKING') == 'OK'
172+
rescue StandardError
173+
false
54174
end
55175
end
56176
end

test/redis_client/test_cluster.rb

+50-2
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def test_transaction_with_empty_block
207207
assert_raises(LocalJumpError) { @client.multi }
208208
end
209209

210-
def test_transaction_with_keyless_commands
210+
def test_transaction_with_only_keyless_commands
211211
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
212212
@client.multi do |t|
213213
t.call('ECHO', 'foo')
@@ -234,7 +234,7 @@ def test_transaction_without_hashtag
234234
end
235235
end
236236

237-
assert_raises(::RedisClient::CommandError, 'CROSSSLOT keys') do
237+
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
238238
@client.multi do |t|
239239
t.call('MSET', 'key1', '1', 'key2', '2')
240240
t.call('MSET', 'key1', '1', 'key3', '3')
@@ -247,6 +247,54 @@ def test_transaction_without_hashtag
247247
end
248248
end
249249

250+
def test_transaction_with_watch
251+
@client.call('MSET', '{key}1', '0', '{key}2', '0')
252+
253+
got = @client.multi(watch: %w[{key}1 {key}2]) do |tx|
254+
tx.call('ECHO', 'START')
255+
tx.call('SET', '{key}1', '1')
256+
tx.call('SET', '{key}2', '2')
257+
tx.call('ECHO', 'FINISH')
258+
end
259+
260+
assert_equal(%w[START OK OK FINISH], got)
261+
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
262+
end
263+
264+
def test_transaction_with_unsafe_watch
265+
@client.call('MSET', '{key}1', '0', '{key}2', '0')
266+
267+
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
268+
@client.multi(watch: %w[key1 key2]) do |tx|
269+
tx.call('SET', '{key}1', '1')
270+
tx.call('SET', '{key}2', '2')
271+
end
272+
end
273+
274+
assert_raises(::RedisClient::Cluster::Transaction::ConsistencyError) do
275+
@client.multi(watch: %w[{hey}1 {hey}2]) do |tx|
276+
tx.call('SET', '{key}1', '1')
277+
tx.call('SET', '{key}2', '2')
278+
end
279+
end
280+
281+
assert_equal(%w[0 0], @client.call('MGET', '{key}1', '{key}2'))
282+
end
283+
284+
def test_transaction_with_meaningless_watch
285+
@client.call('MSET', '{key}1', '0', '{key}2', '0')
286+
287+
got = @client.multi(watch: %w[{key}3 {key}4]) do |tx|
288+
tx.call('ECHO', 'START')
289+
tx.call('SET', '{key}1', '1')
290+
tx.call('SET', '{key}2', '2')
291+
tx.call('ECHO', 'FINISH')
292+
end
293+
294+
assert_equal(%w[START OK OK FINISH], got)
295+
assert_equal(%w[1 2], @client.call('MGET', '{key}1', '{key}2'))
296+
end
297+
250298
def test_pubsub_without_subscription
251299
pubsub = @client.pubsub
252300
assert_nil(pubsub.next_event(0.01))

0 commit comments

Comments
 (0)