Skip to content

Support SparkIntegration activation after SparkContext created #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions sentry_sdk/integrations/spark/spark_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Optional

from sentry_sdk._types import Event, Hint
from pyspark import SparkContext


class SparkIntegration(Integration):
Expand All @@ -17,7 +18,7 @@ class SparkIntegration(Integration):
@staticmethod
def setup_once():
# type: () -> None
patch_spark_context_init()
_setup_sentry_tracing()


def _set_app_properties():
Expand All @@ -37,7 +38,7 @@ def _set_app_properties():


def _start_sentry_listener(sc):
# type: (Any) -> None
# type: (SparkContext) -> None
"""
Start java gateway server to add custom `SparkListener`
"""
Expand All @@ -49,7 +50,51 @@ def _start_sentry_listener(sc):
sc._jsc.sc().addSparkListener(listener)


def patch_spark_context_init():
def _add_event_processor(sc):
# type: (SparkContext) -> None
scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if sc._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", sc.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", sc._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
sc._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
event["tags"].setdefault("spark_version", sc.version)
event["tags"].setdefault("app_name", sc.appName)
event["tags"].setdefault("application_id", sc.applicationId)
event["tags"].setdefault("master", sc.master)
event["tags"].setdefault("spark_home", sc.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)

return event


def _activate_integration(sc):
# type: (SparkContext) -> None

_start_sentry_listener(sc)
_set_app_properties()
_add_event_processor(sc)


def _patch_spark_context_init():
# type: () -> None
from pyspark import SparkContext

Expand All @@ -59,51 +104,22 @@ def patch_spark_context_init():
def _sentry_patched_spark_context_init(self, *args, **kwargs):
# type: (SparkContext, *Any, **Any) -> Optional[Any]
rv = spark_context_init(self, *args, **kwargs)
_start_sentry_listener(self)
_set_app_properties()

scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if self._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", self.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", self._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
self._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault(
"driver.host", self._conf.get("spark.driver.host")
)
event["tags"].setdefault(
"driver.port", self._conf.get("spark.driver.port")
)
event["tags"].setdefault("spark_version", self.version)
event["tags"].setdefault("app_name", self.appName)
event["tags"].setdefault("application_id", self.applicationId)
event["tags"].setdefault("master", self.master)
event["tags"].setdefault("spark_home", self.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)

return event
Comment on lines -65 to -100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated this part into a distinct function.


_activate_integration(self)
return rv

SparkContext._do_init = _sentry_patched_spark_context_init


def _setup_sentry_tracing():
# type: () -> None
from pyspark import SparkContext

if SparkContext._active_spark_context is not None:
_activate_integration(SparkContext._active_spark_context)
return
_patch_spark_context_init()
Comment on lines +117 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the Spark context already exists, _activate_integration is called instead of applying the patch.



class SparkListener:
def onApplicationEnd(self, applicationEnd): # noqa: N802,N803
# type: (Any) -> None
Expand Down Expand Up @@ -208,10 +224,21 @@ class Java:


class SentryListener(SparkListener):
def _add_breadcrumb(
self,
level, # type: str
message, # type: str
data=None, # type: Optional[dict[str, Any]]
):
# type: (...) -> None
sentry_sdk.get_global_scope().add_breadcrumb(
level=level, message=message, data=data
)

def onJobStart(self, jobStart): # noqa: N802,N803
# type: (Any) -> None
message = "Job {} Started".format(jobStart.jobId())
sentry_sdk.add_breadcrumb(level="info", message=message)
self._add_breadcrumb(level="info", message=message)
_set_app_properties()

def onJobEnd(self, jobEnd): # noqa: N802,N803
Expand All @@ -227,14 +254,14 @@ def onJobEnd(self, jobEnd): # noqa: N802,N803
level = "warning"
message = "Job {} Failed".format(jobEnd.jobId())

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)

def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
# type: (Any) -> None
stage_info = stageSubmitted.stageInfo()
message = "Stage {} Submitted".format(stage_info.stageId())
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
sentry_sdk.add_breadcrumb(level="info", message=message, data=data)
self._add_breadcrumb(level="info", message=message, data=data)
_set_app_properties()

def onStageCompleted(self, stageCompleted): # noqa: N802,N803
Expand All @@ -255,4 +282,4 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
message = "Stage {} Completed".format(stage_info.stageId())
level = "info"

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)
1 change: 0 additions & 1 deletion tests/integrations/asgi/test_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ async def app(scope, receive, send):

@pytest.fixture
def asgi3_custom_transaction_app():

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] this whitespace change is out of scope for the PR (guessing it was accidentally committed)

async def app(scope, receive, send):
sentry_sdk.get_current_scope().set_transaction_name("foobar", source="custom")
await send(
Expand Down
Loading
Loading