Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move telemetry code to helix-telemetry project #382

Merged
merged 33 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ chardet="*"
slack_sdk = ">=3.22.0"
# smart_open is needed for reading and writing files
smart_open = { extras = ['s3'], version = ">=6.3.0" }
# mlflow is needed for tracking experiments in MLFlow
mlflow-skinny = "==2.17.2"
# sqlalchemy is needed for interacting with databases
SQLAlchemy = ">=1.4.37"
# sqlparse is needed for parsing SQL
Expand All @@ -41,7 +39,7 @@ bounded-pool-executor = ">=0.0.3"
# fastjsonschema is needed for validating JSON
fastjsonschema= ">=2.18.0"
# helix.fhir.client.sdk is needed for interacting with FHIR servers
"helix.fhir.client.sdk" = ">=3.0.38"
"helix.fhir.client.sdk" = ">=4.1.0"
# opensearch-py is needed for interacting with OpenSearch
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
# pyathena is needed for interacting with Athena in AWS
Expand All @@ -66,25 +64,16 @@ pydantic=">=2.8.2"
motor={ extras = ['snappy', 'zstd'], version = ">=3.5.1" }
# dataclasses-json is needed for working with JSON data to read from and write to json
dataclasses-json = ">=0.6.7"
# helixtelemetry is needed for sending telemetry data
helixtelemetry= ">=1.0.3"
# ====== For OpenTelemetry ======
opentelemetry-api = ">=1.30.0"
opentelemetry-sdk = ">=1.30.0"
opentelemetry-exporter-otlp = ">=1.30.0"
opentelemetry-instrumentation-system-metrics = ">=0.51b0"
opentelemetry-instrumentation-logging = ">=0.51b0"
opentelemetry-instrumentation-pymysql = ">=0.51b0"
opentelemetry-instrumentation-pymongo = ">=0.51b0"
opentelemetry-instrumentation-elasticsearch = ">=0.51b0"
opentelemetry-instrumentation-boto = ">=0.51b0"
opentelemetry-instrumentation-botocore = ">=0.51b0"
opentelemetry-instrumentation-httpx = ">=0.51b0"
opentelemetry-instrumentation-asyncio = ">=0.51b0"
opentelemetry-instrumentation-aiohttp-client = ">=0.51b0"
opentelemetry-instrumentation-confluent-kafka = ">=0.51b0"
opentelemetry-instrumentation-requests = ">=0.51b0"
#opentelemetry-exporter-richconsole = ">=0.51b0"
# ====== End OpenTelemetry ======

# msgpack is needed for serializing and deserializing data
msgpack = ">=1.1.0"


[dev-packages]
Expand Down Expand Up @@ -167,6 +156,10 @@ types-Pygments="*"
types-jsonschema="*"
types-cffi="*"
types-pyOpenSSL="*"
# msgpack-types provides type hints for msgpack
msgpack-types = { version = ">=0.5.0" }
# objsize is needed for getting the size of objects in bytes
objsize = ">=0.7.1"

# These dependencies are required for pipenv-setup. They conflict with ones above, so we install these
# only when running pipenv-setup
Expand Down
2,673 changes: 1,381 additions & 1,292 deletions Pipfile.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,20 @@ This will install Java, Scala, Spark and other packages

# Asynchronous Processing
[Asynchronous Processing](asynchronous.md)

# Using local packages
If you're making changes in packages (e.g., SparkPipelineFramework), you can point your docker to read from those instead of the packages from Pipfile.
1. Uncomment the line in docker-compose.yml for the package you want to run from your local machine
2. (Optional) For debugging into these, you need to tell PyCharm to look into that folder also. You do this by creating path mappings in PyCharm Debug dialog. [https://www.jetbrains.com/help/pycharm/edit-project-path-mappings-dialog.html](https://www.jetbrains.com/help/pycharm/edit-project-path-mappings-dialog.html)


For example:
(These paths have to absolute so replace with the absolute path on your machine.)
| Local Path | Remote Path |
|------------------------------------------------------------------------------------------|------------------------------------------------|
|`/Users/imranqureshi/git/helix.fhir.client.sdk/helix_fhir_client_sdk/` |`/usr/local/lib/python3.12/dist-packages/helix_fhir_client_sdk/` |


NOTE: If you're setting breakpoints, make sure to open the file from the local path to set them. Otherwise PyCharm opens the version from Pipfile which will never hit your breakpints.

NOTE: Remember to undo your docker-compose.yml change and these path mappings when you're done and switch back to using the package from Pipfile.
18 changes: 18 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from moto import mock_aws

from create_spark_session import create_spark_session
from spark_pipeline_framework.register import register


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -41,3 +42,20 @@ def s3_mock(
) -> Generator[BaseClient, None, None]:
with mock_aws():
yield boto3.client("s3", region_name="us-east-1")


@pytest.fixture(scope="session", autouse=True)
def run_before_each_test() -> Generator[None, Any, None]:
# This code will run before every test
# print("Setting up something before each test")
# You can do setup operations here
# For example, initializing databases, clearing caches, etc.
print("Setting up before each test")

register()

# Optional: You can yield if you want to do tear down after the test
yield

# Optional teardown code here
print("Cleaning up after each test")
15 changes: 3 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ services:
# PYTHONMALLOC: debug
PYTHONASYNCIODEBUG: 1
# PYTHONTRACEMALLOC: 1
LOGLEVEL: DEBUG
# These environment variables are used to configure the spark pipeline framework
FHIR_SERVER_URL: 'http://fhir:3000/4_0_0/'
AUTH_CONFIGURATION_URI: 'http://keycloak:8080/realms/bwell-realm/.well-known/openid-configuration'
Expand All @@ -23,8 +24,8 @@ services:
ELASTICSEARCH_HOST: https://elasticsearch:9200
volumes:
- ./:/SparkpipelineFramework/
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.10/dist-packages/helix_fhir_client_sdk
# - ../mockserver_client/mockserver_client:/usr/local/lib/python3.10/dist-packages/mockserver_client
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.12/dist-packages/helix_fhir_client_sdk
# - ../mockserver_client/mockserver_client:/usr/local/lib/python3.12/dist-packages/mockserver_client
container_name: spf_dev
working_dir: /SparkpipelineFramework

Expand All @@ -39,16 +40,6 @@ services:
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]

mlflow:
image: imranq2/mlflow_server:0.2.0
depends_on:
- warehouse
ports:
- '5050:5000'
volumes:
- ./mlflow_artifacts/:/opt/project/mlflow_artifacts/
command: mlflow server --host 0.0.0.0 --port 5000 --backend-store-uri mysql+pymysql://root:root_password@warehouse:3306/fhir_rpt --default-artifact-root ./opt/project/mlflow_artifacts

mock-server:
image: mockserver/mockserver:mockserver-5.15.0
command: -serverPort 1080
Expand Down
4 changes: 4 additions & 0 deletions pre-commit-hook
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

GIT_PROJECT_ROOT=$(git rev-parse --show-toplevel)
cd "$($GIT_PROJECT_ROOT "$0")"

echo "To use local package, mount the package directory as a volume. Example:"
echo "-v \${PWD}/../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.12/site-packages/helix_fhir_client_sdk \\"

docker build -f ./pre-commit.Dockerfile -t pre-commit-spf . && docker run --rm --name pre-commit-spf-run -v $(pwd)/:/sourcecode pre-commit-spf
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-fastjsonschema.*]
ignore_missing_imports = True
[mypy-mlflow.store.tracking.file_store.*]
ignore_missing_imports = True
[mypy-py4j.protocol.*]
ignore_missing_imports = True
[mypy-usaddress.*]
Expand Down
16 changes: 3 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@
"chardet",
"slack-sdk>=3.22.0",
"smart-open[s3]>=6.3.0",
"mlflow-skinny==2.17.2",
"sqlalchemy>=1.4.37",
"sqlparse>=0.5.3",
"bounded-pool-executor>=0.0.3",
"fastjsonschema>=2.18.0",
"helix.fhir.client.sdk>=3.0.38",
"helix.fhir.client.sdk>=4.1.0",
Copy link
Preview

Copilot AI Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description indicates an upgrade to version 4.0.3 of helix.fhir.client.sdk, but setup.py specifies '>=4.1.0'. Please confirm and align the intended version.

Suggested change
"helix.fhir.client.sdk>=4.1.0",
"helix.fhir.client.sdk>=4.0.3",

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

"opensearch-py[async]>=2.6.0",
"pyathena>2.14.0",
"spark-nlp>=4.2.3",
Expand All @@ -76,21 +75,12 @@
"pydantic>=2.8.2",
"motor[snappy,zstd]>=3.5.1",
"dataclasses-json>=0.6.7",
"helixtelemetry>=1.0.3",
"opentelemetry-api>=1.30.0",
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp>=1.30.0",
"opentelemetry-instrumentation-system-metrics>=0.51b0",
"opentelemetry-instrumentation-logging>=0.51b0",
"opentelemetry-instrumentation-pymysql>=0.51b0",
"opentelemetry-instrumentation-pymongo>=0.51b0",
"opentelemetry-instrumentation-elasticsearch>=0.51b0",
"opentelemetry-instrumentation-boto>=0.51b0",
"opentelemetry-instrumentation-botocore>=0.51b0",
"opentelemetry-instrumentation-httpx>=0.51b0",
"opentelemetry-instrumentation-asyncio>=0.51b0",
"opentelemetry-instrumentation-aiohttp-client>=0.51b0",
"opentelemetry-instrumentation-confluent-kafka>=0.51b0",
"opentelemetry-instrumentation-requests>=0.51b0",
"msgpack>=1.1.0",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand Down
3 changes: 3 additions & 0 deletions spark.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ ENV CLASSPATH=/SparkpipelineFramework/jars:$CLASSPATH
# remove the older version of entrypoints with apt-get because that is how it was installed
RUN apt-get remove python3-entrypoints -y

# remove python3.10 stuff
RUN rm -rf /usr/local/lib/python3.10

COPY Pipfile* /SparkpipelineFramework/
WORKDIR /SparkpipelineFramework

Expand Down
4 changes: 1 addition & 3 deletions spark_pipeline_framework/mixins/telemetry_parent_mixin.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Optional

from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
TelemetryParent,
)
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent


class TelemetryParentMixin:
Expand Down
35 changes: 12 additions & 23 deletions spark_pipeline_framework/pipelines/framework_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
import os
from typing import Any, Dict, List, Union, Optional, Mapping

from helixtelemetry.telemetry.context.telemetry_context import TelemetryContext
from helixtelemetry.telemetry.factory.telemetry_factory import TelemetryFactory
from helixtelemetry.telemetry.providers.null_telemetry import NullTelemetry
from helixtelemetry.telemetry.providers.open_telemetry import OpenTelemetry
from helixtelemetry.telemetry.spans.telemetry_span_creator import TelemetrySpanCreator
from helixtelemetry.telemetry.spans.telemetry_span_wrapper import TelemetrySpanWrapper
from helixtelemetry.telemetry.structures.telemetry_attribute_value import (
TelemetryAttributeValue,
)
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent
from pyspark.ml.base import Transformer
from pyspark.sql.dataframe import DataFrame

Expand All @@ -19,27 +29,6 @@
from spark_pipeline_framework.utilities.async_helper.v1.async_helper import AsyncHelper
from spark_pipeline_framework.utilities.class_helpers import ClassHelpers
from spark_pipeline_framework.utilities.pipeline_helper import create_steps
from spark_pipeline_framework.utilities.telemetry.telemetry_attribute_value import (
TelemetryAttributeValue,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_context import (
TelemetryContext,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_factory import (
TelemetryFactory,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
TelemetryParent,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_provider import (
TelemetryProvider,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_span_creator import (
TelemetrySpanCreator,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_span_wrapper import (
TelemetrySpanWrapper,
)


class FrameworkPipeline(Transformer, LoopIdMixin, TelemetryParentMixin):
Expand Down Expand Up @@ -86,9 +75,9 @@ def __init__(
telemetry_context
or TelemetryContext(
provider=(
TelemetryProvider.OPEN_TELEMETRY
OpenTelemetry.telemetry_provider
if self.telemetry_enable
else TelemetryProvider.NULL
else NullTelemetry.telemetry_provider
),
service_name=os.getenv(
"OTEL_SERVICE_NAME", "helix-pipelines"
Expand Down
45 changes: 13 additions & 32 deletions spark_pipeline_framework/pipelines/v2/framework_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@
import os
from typing import Any, Dict, List, Optional, Union, Mapping

# noinspection PyPackageRequirements
from mlflow.entities import RunStatus
from helixtelemetry.telemetry.context.telemetry_context import TelemetryContext
from helixtelemetry.telemetry.factory.telemetry_factory import TelemetryFactory
from helixtelemetry.telemetry.providers.null_telemetry import NullTelemetry
from helixtelemetry.telemetry.providers.open_telemetry import OpenTelemetry
from helixtelemetry.telemetry.spans.telemetry_span_creator import TelemetrySpanCreator
from helixtelemetry.telemetry.spans.telemetry_span_wrapper import TelemetrySpanWrapper
from helixtelemetry.telemetry.structures.telemetry_attribute_value import (
TelemetryAttributeValue,
)
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent

from pyspark.ml.base import Transformer
from pyspark.sql.dataframe import DataFrame

Expand Down Expand Up @@ -31,27 +40,6 @@
from spark_pipeline_framework.utilities.spark_data_frame_helpers import (
spark_list_catalog_table_names,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_attribute_value import (
TelemetryAttributeValue,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_context import (
TelemetryContext,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_factory import (
TelemetryFactory,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
TelemetryParent,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_provider import (
TelemetryProvider,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_span_creator import (
TelemetrySpanCreator,
)
from spark_pipeline_framework.utilities.telemetry.telemetry_span_wrapper import (
TelemetrySpanWrapper,
)


class FrameworkPipeline(Transformer, LoopIdMixin, TelemetryParentMixin):
Expand Down Expand Up @@ -117,9 +105,9 @@ def __init__(
telemetry_context
or TelemetryContext(
provider=(
TelemetryProvider.OPEN_TELEMETRY
OpenTelemetry.telemetry_provider
if self.telemetry_enable
else TelemetryProvider.NULL
else NullTelemetry.telemetry_provider
),
service_name=os.getenv(
"OTEL_SERVICE_NAME", "helix-pipelines"
Expand Down Expand Up @@ -232,10 +220,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
telemetry_parent=transformer_span.create_child_telemetry_parent()
)

self.progress_logger.start_mlflow_run(
run_name=stage_name, is_nested=True
)

with ProgressLogMetric(
progress_logger=self.progress_logger,
name=str(stage_name) or "unknown",
Expand All @@ -253,7 +237,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
pipeline_name,
event_text=f"Finished pipeline step {stage_name}",
)
self.progress_logger.end_mlflow_run()
except Exception as e:
logger.error(
f"!!!!!!!!!!!!! pipeline [{pipeline_name}] transformer "
Expand Down Expand Up @@ -285,8 +268,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
event_text=str(e),
ex=e,
)
self.progress_logger.end_mlflow_run(status=RunStatus.FAILED) # type: ignore

raise e

self.progress_logger.log_event(
Expand Down
Loading