Skip to content

Commit 4ff38ec

Browse files
committed
feat: Add ray integration support (#2400)
1 parent 6906dad commit 4ff38ec

File tree

4 files changed

+139
-0
lines changed

4 files changed

+139
-0
lines changed

sentry_sdk/integrations/ray.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from sentry_sdk.integrations import DidNotEnable, Integration
2+
3+
try:
4+
import ray
5+
except ImportError:
6+
raise DidNotEnable("Ray not installed.")
7+
import functools
8+
9+
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
10+
import logging
11+
import sentry_sdk
12+
from importlib.metadata import version
13+
14+
15+
def _check_sentry_initialized():
16+
if sentry_sdk.Hub.current.client:
17+
return
18+
# we cannot use sentry sdk logging facilities because it wasn't initialized
19+
logger = logging.getLogger("sentry_sdk.errors")
20+
logger.warning(
21+
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
22+
)
23+
24+
25+
def _patch_ray_remote():
26+
old_remote = ray.remote
27+
28+
@functools.wraps(old_remote)
29+
def new_remote(f, *args, **kwargs):
30+
def _f(*f_args, _tracing=None, **f_kwargs):
31+
_check_sentry_initialized()
32+
with sentry_sdk.start_transaction(
33+
sentry_sdk.continue_trace(
34+
_tracing,
35+
op="ray.remote.receive",
36+
source=TRANSACTION_SOURCE_TASK,
37+
name="Ray worker transaction",
38+
)
39+
) as tx:
40+
result = f(*f_args, **f_kwargs)
41+
tx.set_status("ok")
42+
return result
43+
44+
_f = old_remote(_f, *args, *kwargs)
45+
old_remote_method = _f.remote
46+
47+
def _remote_method_with_header_propagation(*args, **kwargs):
48+
with sentry_sdk.start_span(
49+
op="ray.remote.send", description="Sending task to ray cluster."
50+
):
51+
tracing = {
52+
k: v
53+
for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers()
54+
}
55+
return old_remote_method(*args, **kwargs, _tracing=tracing)
56+
57+
_f.remote = _remote_method_with_header_propagation
58+
59+
return _f
60+
61+
ray.remote = new_remote
62+
return
63+
64+
65+
class RayIntegration(Integration):
66+
identifier = "ray"
67+
68+
@staticmethod
69+
def setup_once():
70+
if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0):
71+
raise DidNotEnable("Ray 2.7.0 or newer required")
72+
_patch_ray_remote()

tests/integrations/ray/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import pytest
2+
3+
pytest.importorskip("ray")

tests/integrations/ray/test_ray.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import time
2+
3+
import ray
4+
5+
import sentry_sdk
6+
from sentry_sdk.envelope import Envelope
7+
from sentry_sdk.integrations.ray import RayIntegration
8+
from tests.conftest import TestTransport
9+
10+
11+
class RayTestTransport(TestTransport):
12+
def __init__(self):
13+
self.events = []
14+
self.envelopes = []
15+
super().__init__(self.events.append, self.envelopes.append)
16+
17+
18+
def _setup_ray_sentry():
19+
sentry_sdk.init(
20+
traces_sample_rate=1.0,
21+
integrations=[RayIntegration()],
22+
transport=RayTestTransport(),
23+
)
24+
25+
26+
def test_ray():
27+
_setup_ray_sentry()
28+
29+
@ray.remote
30+
def _task():
31+
with sentry_sdk.start_span(op="task", description="example task step"):
32+
time.sleep(0.1)
33+
return sentry_sdk.Hub.current.client.transport.envelopes
34+
35+
ray.init(
36+
runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./")
37+
)
38+
39+
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
40+
worker_envelopes = ray.get(_task.remote())
41+
42+
_assert_envelopes_are_associated_with_same_trace_id(
43+
sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0]
44+
)
45+
46+
47+
def _assert_envelopes_are_associated_with_same_trace_id(
48+
client_side_envelope: Envelope, worker_envelope: Envelope
49+
):
50+
client_side_envelope_dict = client_side_envelope.get_transaction_event()
51+
worker_envelope_dict = worker_envelope.get_transaction_event()
52+
trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"]
53+
for span in client_side_envelope_dict["spans"]:
54+
assert span["trace_id"] == trace_id
55+
for span in worker_envelope_dict["spans"]:
56+
assert span["trace_id"] == trace_id
57+
assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id

tox.ini

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ envlist =
136136
{py3.7,py3.8,py3.9,py3.10,py3.11}-quart-v{0.16,0.17,0.18}
137137
{py3.8,py3.9,py3.10,py3.11}-quart-v{0.19}
138138

139+
# Ray
140+
{py3.10,py3.11}-ray
141+
139142
# Redis
140143
{py2.7,py3.7,py3.8,py3.9,py3.10,py3.11}-redis
141144

@@ -401,6 +404,9 @@ deps =
401404
pyramid-v1.9: pyramid>=1.9,<1.10
402405
pyramid-v1.10: pyramid>=1.10,<1.11
403406

407+
# Ray
408+
ray: ray>=2.7.0
409+
404410
# Quart
405411
quart: quart-auth
406412
quart: pytest-asyncio
@@ -551,6 +557,7 @@ setenv =
551557
pymongo: TESTPATH=tests/integrations/pymongo
552558
pyramid: TESTPATH=tests/integrations/pyramid
553559
quart: TESTPATH=tests/integrations/quart
560+
ray: TESTPATH=tests/integrations/ray
554561
redis: TESTPATH=tests/integrations/redis
555562
rediscluster: TESTPATH=tests/integrations/rediscluster
556563
requests: TESTPATH=tests/integrations/requests

0 commit comments

Comments
 (0)