Skip to content

Commit 7162d3e

Browse files
authored
Worker client replacement and activity client access (#219)
Fixes #165 Fixes #181
1 parent 5b34e03 commit 7162d3e

File tree

10 files changed

+134
-5
lines changed

10 files changed

+134
-5
lines changed

temporalio/lib/temporalio/activity/context.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,16 @@ def _scoped_logger_info
108108
end
109109

110110
# @return [Metric::Meter] Metric meter to create metrics on, with some activity-specific attributes already set.
111+
# @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set.
111112
def metric_meter
112113
raise NotImplementedError
113114
end
115+
116+
# @return [Client] Temporal client this activity worker is running in.
117+
# @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set.
118+
def client
119+
raise NotImplementedError
120+
end
114121
end
115122
end
116123
end

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ def execute_activity(task_token, defn, start)
201201

202202
# Run
203203
activity = RunningActivity.new(
204+
worker: @worker,
204205
info:,
205206
cancellation: Cancellation.new,
206207
worker_shutdown_cancellation: @worker._worker_shutdown_cancellation,
@@ -299,13 +300,15 @@ class RunningActivity < Activity::Context
299300
attr_accessor :instance, :_outbound_impl, :_server_requested_cancel
300301

301302
def initialize( # rubocop:disable Lint/MissingSuper
303+
worker:,
302304
info:,
303305
cancellation:,
304306
worker_shutdown_cancellation:,
305307
payload_converter:,
306308
logger:,
307309
runtime_metric_meter:
308310
)
311+
@worker = worker
309312
@info = info
310313
@cancellation = cancellation
311314
@worker_shutdown_cancellation = worker_shutdown_cancellation
@@ -334,6 +337,10 @@ def metric_meter
334337
}
335338
)
336339
end
340+
341+
def client
342+
@worker.client
343+
end
337344
end
338345

339346
class InboundImplementation < Temporalio::Worker::Interceptor::Activity::Inbound

temporalio/lib/temporalio/testing/activity_environment.rb

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,19 @@ def self.default_info
4747
# @param payload_converter [Converters::PayloadConverter] Value for {Activity::Context#payload_converter}.
4848
# @param logger [Logger] Value for {Activity::Context#logger}.
4949
# @param activity_executors [Hash<Symbol, Worker::ActivityExecutor>] Executors that activities can run within.
50+
# @param metric_meter [Metric::Meter, nil] Value for {Activity::Context#metric_meter}, or nil to raise when
51+
# called.
52+
# @param client [Client, nil] Value for {Activity::Context#client}, or nil to raise when called.
5053
def initialize(
5154
info: ActivityEnvironment.default_info,
5255
on_heartbeat: nil,
5356
cancellation: Cancellation.new,
5457
worker_shutdown_cancellation: Cancellation.new,
5558
payload_converter: Converters::PayloadConverter.default,
5659
logger: Logger.new(nil),
57-
activity_executors: Worker::ActivityExecutor.defaults
60+
activity_executors: Worker::ActivityExecutor.defaults,
61+
metric_meter: nil,
62+
client: nil
5863
)
5964
@info = info
6065
@on_heartbeat = on_heartbeat
@@ -63,6 +68,8 @@ def initialize(
6368
@payload_converter = payload_converter
6469
@logger = logger
6570
@activity_executors = activity_executors
71+
@metric_meter = metric_meter
72+
@client = client
6673
end
6774

6875
# Run an activity and returns its result or raises its exception.
@@ -86,7 +93,9 @@ def run(activity, *args)
8693
cancellation: @cancellation,
8794
worker_shutdown_cancellation: @worker_shutdown_cancellation,
8895
payload_converter: @payload_converter,
89-
logger: @logger
96+
logger: @logger,
97+
metric_meter: @metric_meter,
98+
client: @client
9099
))
91100
queue.push([defn.proc.call(*args), nil])
92101
rescue Exception => e # rubocop:disable Lint/RescueException Intentionally capturing all exceptions
@@ -113,7 +122,9 @@ def initialize( # rubocop:disable Lint/MissingSuper
113122
cancellation:,
114123
worker_shutdown_cancellation:,
115124
payload_converter:,
116-
logger:
125+
logger:,
126+
metric_meter:,
127+
client:
117128
)
118129
@info = info
119130
@instance = instance
@@ -122,12 +133,24 @@ def initialize( # rubocop:disable Lint/MissingSuper
122133
@worker_shutdown_cancellation = worker_shutdown_cancellation
123134
@payload_converter = payload_converter
124135
@logger = logger
136+
@metric_meter = metric_meter
137+
@client = client
125138
end
126139

127140
# @!visibility private
128141
def heartbeat(*details)
129142
@on_heartbeat&.call(details)
130143
end
144+
145+
# @!visibility private
146+
def metric_meter
147+
@metric_meter or raise 'No metric meter configured in this test environment'
148+
end
149+
150+
# @!visibility private
151+
def client
152+
@client or raise 'No client configured in this test environment'
153+
end
131154
end
132155

133156
private_constant :Context

temporalio/lib/temporalio/worker.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class Worker
5555
)
5656

5757
# Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details.
58+
#
59+
# Note, the `client` within can be replaced via client setter.
5860
class Options; end # rubocop:disable Lint/EmptyClass
5961

6062
# @return [String] Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby
@@ -484,13 +486,35 @@ def initialize(
484486

485487
# Validate worker
486488
@bridge_worker.validate
489+
490+
# Mutex needed for accessing and replacing a client
491+
@client_mutex = Mutex.new
487492
end
488493

489494
# @return [String] Task queue set on the worker options.
490495
def task_queue
491496
@options.task_queue
492497
end
493498

499+
# @return [Client] Client for this worker. This is the same as {Options.client} in {options}, but surrounded by a
500+
# mutex to be safe for client replacement in {client=}.
501+
def client
502+
@client_mutex.synchronize { @options.client }
503+
end
504+
505+
# Replace the worker's client. When this is called, the client is replaced on the internal worker which means any
506+
# new calls will be made on the new client (but existing calls will still complete on the previous one). This is
507+
# commonly used for providing a new client with updated authentication credentials.
508+
#
509+
# @param new_client [Client] New client to use for new calls.
510+
def client=(new_client)
511+
@client_mutex.synchronize do
512+
@bridge_worker.replace_client(new_client.connection._core_client)
513+
@options = @options.with(client: new_client)
514+
new_client
515+
end
516+
end
517+
494518
# Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the
495519
# worker is shut down. This will return the block result if everything successful or raise an error if not.
496520
#

temporalio/sig/temporalio/activity/context.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ module Temporalio
1919
def _scoped_logger_info: -> Hash[Symbol, Object]
2020

2121
def metric_meter: -> Metric::Meter
22+
def client: -> Client
2223
end
2324
end
2425
end

temporalio/sig/temporalio/internal/worker/activity_worker.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ module Temporalio
3232
attr_accessor _server_requested_cancel: bool
3333

3434
def initialize: (
35+
worker: Temporalio::Worker,
3536
info: Activity::Info,
3637
cancellation: Cancellation,
3738
worker_shutdown_cancellation: Cancellation,

temporalio/sig/temporalio/testing/activity_environment.rbs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ module Temporalio
1010
?worker_shutdown_cancellation: Cancellation,
1111
?payload_converter: Converters::PayloadConverter,
1212
?logger: Logger,
13-
?activity_executors: Hash[Symbol, Worker::ActivityExecutor]
13+
?activity_executors: Hash[Symbol, Worker::ActivityExecutor],
14+
?metric_meter: Metric::Meter?,
15+
?client: Client?
1416
) -> void
1517

1618
def run: (

temporalio/sig/temporalio/worker.rbs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ module Temporalio
6060
workflow_payload_codec_thread_pool: ThreadPool?,
6161
debug_mode: bool
6262
) -> void
63+
64+
def with: (**Object kwargs) -> Options
6365
end
6466

6567
def self.default_build_id: -> String
@@ -110,6 +112,9 @@ module Temporalio
110112

111113
def task_queue: -> String
112114

115+
def client: -> Client
116+
def client=: (Client new_client) -> void
117+
113118
def run: [T] (
114119
?cancellation: Cancellation,
115120
?shutdown_signals: Array[String | Integer],

temporalio/test/worker_activity_test.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,19 @@ def test_context_instance
901901
execute_activity(shared_instance, interceptors: [ContextInstanceInterceptor.new])
902902
end
903903

904+
class ClientAccessActivity < Temporalio::Activity::Definition
905+
def execute
906+
desc = Temporalio::Activity::Context.current.client.workflow_handle(
907+
Temporalio::Activity::Context.current.info.workflow_id
908+
).describe
909+
desc.raw_description.pending_activities.first.activity_type.name
910+
end
911+
end
912+
913+
def test_client_access
914+
assert_equal 'ClientAccessActivity', execute_activity(ClientAccessActivity)
915+
end
916+
904917
# steep:ignore
905918
def execute_activity(
906919
activity,

temporalio/test/worker_workflow_test.rb

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1769,13 +1769,59 @@ def test_context_instance
17691769
execute_workflow(ContextInstanceWorkflow, interceptors: [ContextInstanceInterceptor.new])
17701770
end
17711771

1772+
class WorkerClientReplacementWorkflow < Temporalio::Workflow::Definition
1773+
def execute
1774+
Temporalio::Workflow.wait_condition { @complete }
1775+
end
1776+
1777+
workflow_signal
1778+
def complete(value)
1779+
@complete = value
1780+
end
1781+
end
1782+
1783+
def test_worker_client_replacement
1784+
# Create a second ephemeral server and start workflow on both servers
1785+
Temporalio::Testing::WorkflowEnvironment.start_local do |env2|
1786+
# Start both workflows on different servers
1787+
task_queue = "tq-#{SecureRandom.uuid}"
1788+
handle1 = env.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)
1789+
handle2 = env2.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)
1790+
1791+
# Run worker on the first env. Make sure cache is off and only 1 max poller
1792+
worker = Temporalio::Worker.new(
1793+
client: env.client, task_queue:, workflows: [WorkerClientReplacementWorkflow],
1794+
max_cached_workflows: 0, max_concurrent_workflow_task_polls: 1
1795+
)
1796+
worker.run do
1797+
# Confirm first workflow has a task complete but not the second
1798+
assert_eventually do
1799+
refute_nil handle1.fetch_history_events.find(&:workflow_task_completed_event_attributes)
1800+
end
1801+
assert_nil handle2.fetch_history_events.find(&:workflow_task_completed_event_attributes)
1802+
1803+
# Replace the client
1804+
worker.client = env2.client
1805+
1806+
# Signal both which should allow the current poll to wake up and it'll be a task failure when trying to submit
1807+
# that to the new client which is ignored. But also the new client will poll for the new workflow, which we will
1808+
# wait for it to complete.
1809+
handle1.signal(WorkerClientReplacementWorkflow.complete, 'done1')
1810+
handle2.signal(WorkerClientReplacementWorkflow.complete, 'done2')
1811+
1812+
# Confirm second workflow on new server completes
1813+
assert_equal 'done2', handle2.result
1814+
handle1.terminate
1815+
end
1816+
end
1817+
end
1818+
17721819
# TODO(cretz): To test
17731820
# * Common
17741821
# * Eager workflow start
17751822
# * Unawaited futures that have exceptions, need to log warning like Java does
17761823
# * Enhanced stack trace?
17771824
# * Separate abstract/interface demonstration
1778-
# * Replace worker client
17791825
# * Reset update randomness seed
17801826
# * Confirm thread pool does not leak, meaning thread/worker goes away after last workflow
17811827
# * Test workflow cancel causing other cancels at the same time but in different coroutines

0 commit comments

Comments
 (0)