Skip to content

fix(pyspark): Grab attemptId more defensively #4130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions sentry_sdk/integrations/spark/spark_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,12 @@ def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
# type: (Any) -> None
stage_info = stageSubmitted.stageInfo()
message = "Stage {} Submitted".format(stage_info.stageId())
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}

data = {"name": stage_info.name()}
attempt_id = _get_attempt_id(stage_info)
if attempt_id is not None:
data["attemptId"] = attempt_id

self._add_breadcrumb(level="info", message=message, data=data)
_set_app_properties()

Expand All @@ -271,7 +276,11 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
stage_info = stageCompleted.stageInfo()
message = ""
level = ""
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}

data = {"name": stage_info.name()}
attempt_id = _get_attempt_id(stage_info)
if attempt_id is not None:
data["attemptId"] = attempt_id

# Have to Try Except because stageInfo.failureReason() is typed with Scala Option
try:
Expand All @@ -283,3 +292,18 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
level = "info"

self._add_breadcrumb(level=level, message=message, data=data)


def _get_attempt_id(stage_info):
# type: (Any) -> Optional[int]
try:
return stage_info.attemptId()
except Exception:
pass

try:
return stage_info.attemptNumber()
except Exception:
pass

return None
60 changes: 60 additions & 0 deletions tests/integrations/spark/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from py4j.protocol import Py4JJavaError


################
# DRIVER TESTS #
################
Expand Down Expand Up @@ -166,6 +167,65 @@ def stageInfo(self): # noqa: N802
assert mock_hub.kwargs["data"]["name"] == "run-job"


def test_sentry_listener_on_stage_submitted_no_attempt_id(sentry_listener):
listener = sentry_listener
with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb:

class StageInfo:
def stageId(self): # noqa: N802
return "sample-stage-id-submit"

def name(self):
return "run-job"

def attemptNumber(self): # noqa: N802
return 14

class MockStageSubmitted:
def stageInfo(self): # noqa: N802
stageinf = StageInfo()
return stageinf

mock_stage_submitted = MockStageSubmitted()
listener.onStageSubmitted(mock_stage_submitted)

mock_add_breadcrumb.assert_called_once()
mock_hub = mock_add_breadcrumb.call_args

assert mock_hub.kwargs["level"] == "info"
assert "sample-stage-id-submit" in mock_hub.kwargs["message"]
assert mock_hub.kwargs["data"]["attemptId"] == 14
assert mock_hub.kwargs["data"]["name"] == "run-job"


def test_sentry_listener_on_stage_submitted_no_attempt_id_or_number(sentry_listener):
listener = sentry_listener
with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb:

class StageInfo:
def stageId(self): # noqa: N802
return "sample-stage-id-submit"

def name(self):
return "run-job"

class MockStageSubmitted:
def stageInfo(self): # noqa: N802
stageinf = StageInfo()
return stageinf

mock_stage_submitted = MockStageSubmitted()
listener.onStageSubmitted(mock_stage_submitted)

mock_add_breadcrumb.assert_called_once()
mock_hub = mock_add_breadcrumb.call_args

assert mock_hub.kwargs["level"] == "info"
assert "sample-stage-id-submit" in mock_hub.kwargs["message"]
assert "attemptId" not in mock_hub.kwargs["data"]
assert mock_hub.kwargs["data"]["name"] == "run-job"


@pytest.fixture
def get_mock_stage_completed():
def _inner(failure_reason):
Expand Down
Loading