Skip to content

Commit f44d6f7

Browse files
authored
Merge pull request #382 from icanbwell/move-telemetry-code-to-helix-telemetry-project
move telemetry code to helix-telemetry project
2 parents 0cacf7e + a051a18 commit f44d6f7

File tree

76 files changed

+1874
-5553
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1874
-5553
lines changed

Pipfile

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ chardet="*"
3030
slack_sdk = ">=3.22.0"
3131
# smart_open is needed for reading and writing files
3232
smart_open = { extras = ['s3'], version = ">=6.3.0" }
33-
# mlflow is needed for tracking experiments in MLFlow
34-
mlflow-skinny = "==2.17.2"
3533
# sqlalchemy is needed for interacting with databases
3634
SQLAlchemy = ">=1.4.37"
3735
# sqlparse is needed for parsing SQL
@@ -41,7 +39,7 @@ bounded-pool-executor = ">=0.0.3"
4139
# fastjsonschema is needed for validating JSON
4240
fastjsonschema= ">=2.18.0"
4341
# helix.fhir.client.sdk is needed for interacting with FHIR servers
44-
"helix.fhir.client.sdk" = ">=3.0.38"
42+
"helix.fhir.client.sdk" = ">=4.1.0"
4543
# opensearch-py is needed for interacting with OpenSearch
4644
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
4745
# pyathena is needed for interacting with Athena in AWS
@@ -66,25 +64,16 @@ pydantic=">=2.8.2"
6664
motor={ extras = ['snappy', 'zstd'], version = ">=3.5.1" }
6765
# dataclasses-json is needed for working with JSON data to read from and write to json
6866
dataclasses-json = ">=0.6.7"
67+
# helixtelemetry is needed for sending telemetry data
68+
helixtelemetry= ">=1.0.3"
6969
# ====== For OpenTelemetry ======
7070
opentelemetry-api = ">=1.30.0"
7171
opentelemetry-sdk = ">=1.30.0"
7272
opentelemetry-exporter-otlp = ">=1.30.0"
73-
opentelemetry-instrumentation-system-metrics = ">=0.51b0"
74-
opentelemetry-instrumentation-logging = ">=0.51b0"
75-
opentelemetry-instrumentation-pymysql = ">=0.51b0"
76-
opentelemetry-instrumentation-pymongo = ">=0.51b0"
77-
opentelemetry-instrumentation-elasticsearch = ">=0.51b0"
78-
opentelemetry-instrumentation-boto = ">=0.51b0"
79-
opentelemetry-instrumentation-botocore = ">=0.51b0"
80-
opentelemetry-instrumentation-httpx = ">=0.51b0"
81-
opentelemetry-instrumentation-asyncio = ">=0.51b0"
8273
opentelemetry-instrumentation-aiohttp-client = ">=0.51b0"
83-
opentelemetry-instrumentation-confluent-kafka = ">=0.51b0"
84-
opentelemetry-instrumentation-requests = ">=0.51b0"
85-
#opentelemetry-exporter-richconsole = ">=0.51b0"
8674
# ====== End OpenTelemetry ======
87-
75+
# msgpack is needed for serializing and deserializing data
76+
msgpack = ">=1.1.0"
8877

8978

9079
[dev-packages]
@@ -167,6 +156,10 @@ types-Pygments="*"
167156
types-jsonschema="*"
168157
types-cffi="*"
169158
types-pyOpenSSL="*"
159+
# msgpack-types provides type hints for msgpack
160+
msgpack-types = { version = ">=0.5.0" }
161+
# objsize is needed for getting the size of objects in bytes
162+
objsize = ">=0.7.1"
170163

171164
# These dependencies are required for pipenv-setup. They conflict with ones above, so we install these
172165
# only when running pipenv-setup

Pipfile.lock

Lines changed: 1381 additions & 1292 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,20 @@ This will install Java, Scala, Spark and other packages
258258

259259
# Asynchronous Processing
260260
[Asynchronous Processing](asynchronous.md)
261+
262+
# Using local packages
263+
If you're making changes in packages (e.g., SparkPipelineFramework), you can point your docker to read from those instead of the packages from Pipfile.
264+
1. Uncomment the line in docker-compose.yml for the package you want to run from your local machine
265+
2. (Optional) For debugging into these, you need to tell PyCharm to look into that folder also. You do this by creating path mappings in PyCharm Debug dialog. [https://www.jetbrains.com/help/pycharm/edit-project-path-mappings-dialog.html](https://www.jetbrains.com/help/pycharm/edit-project-path-mappings-dialog.html)
266+
267+
268+
For example:
269+
(These paths have to absolute so replace with the absolute path on your machine.)
270+
| Local Path | Remote Path |
271+
|------------------------------------------------------------------------------------------|------------------------------------------------|
272+
|`/Users/imranqureshi/git/helix.fhir.client.sdk/helix_fhir_client_sdk/` |`/usr/local/lib/python3.12/dist-packages/helix_fhir_client_sdk/` |
273+
274+
275+
NOTE: If you're setting breakpoints, make sure to open the file from the local path to set them. Otherwise PyCharm opens the version from Pipfile which will never hit your breakpints.
276+
277+
NOTE: Remember to undo your docker-compose.yml change and these path mappings when you're done and switch back to using the package from Pipfile.

conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from moto import mock_aws
1111

1212
from create_spark_session import create_spark_session
13+
from spark_pipeline_framework.register import register
1314

1415

1516
@pytest.fixture(scope="session")
@@ -41,3 +42,20 @@ def s3_mock(
4142
) -> Generator[BaseClient, None, None]:
4243
with mock_aws():
4344
yield boto3.client("s3", region_name="us-east-1")
45+
46+
47+
@pytest.fixture(scope="session", autouse=True)
48+
def run_before_each_test() -> Generator[None, Any, None]:
49+
# This code will run before every test
50+
# print("Setting up something before each test")
51+
# You can do setup operations here
52+
# For example, initializing databases, clearing caches, etc.
53+
print("Setting up before each test")
54+
55+
register()
56+
57+
# Optional: You can yield if you want to do tear down after the test
58+
yield
59+
60+
# Optional teardown code here
61+
print("Cleaning up after each test")

docker-compose.yml

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ services:
1515
# PYTHONMALLOC: debug
1616
PYTHONASYNCIODEBUG: 1
1717
# PYTHONTRACEMALLOC: 1
18+
LOGLEVEL: DEBUG
1819
# These environment variables are used to configure the spark pipeline framework
1920
FHIR_SERVER_URL: 'http://fhir:3000/4_0_0/'
2021
AUTH_CONFIGURATION_URI: 'http://keycloak:8080/realms/bwell-realm/.well-known/openid-configuration'
@@ -23,8 +24,8 @@ services:
2324
ELASTICSEARCH_HOST: https://elasticsearch:9200
2425
volumes:
2526
- ./:/SparkpipelineFramework/
26-
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.10/dist-packages/helix_fhir_client_sdk
27-
# - ../mockserver_client/mockserver_client:/usr/local/lib/python3.10/dist-packages/mockserver_client
27+
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.12/dist-packages/helix_fhir_client_sdk
28+
# - ../mockserver_client/mockserver_client:/usr/local/lib/python3.12/dist-packages/mockserver_client
2829
container_name: spf_dev
2930
working_dir: /SparkpipelineFramework
3031

@@ -39,16 +40,6 @@ services:
3940
healthcheck:
4041
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
4142

42-
mlflow:
43-
image: imranq2/mlflow_server:0.2.0
44-
depends_on:
45-
- warehouse
46-
ports:
47-
- '5050:5000'
48-
volumes:
49-
- ./mlflow_artifacts/:/opt/project/mlflow_artifacts/
50-
command: mlflow server --host 0.0.0.0 --port 5000 --backend-store-uri mysql+pymysql://root:root_password@warehouse:3306/fhir_rpt --default-artifact-root ./opt/project/mlflow_artifacts
51-
5243
mock-server:
5344
image: mockserver/mockserver:mockserver-5.15.0
5445
command: -serverPort 1080

pre-commit-hook

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,8 @@
22

33
GIT_PROJECT_ROOT=$(git rev-parse --show-toplevel)
44
cd "$($GIT_PROJECT_ROOT "$0")"
5+
6+
echo "To use local package, mount the package directory as a volume. Example:"
7+
echo "-v \${PWD}/../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.12/site-packages/helix_fhir_client_sdk \\"
8+
59
docker build -f ./pre-commit.Dockerfile -t pre-commit-spf . && docker run --rm --name pre-commit-spf-run -v $(pwd)/:/sourcecode pre-commit-spf

setup.cfg

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ ignore_missing_imports = True
1212
ignore_missing_imports = True
1313
[mypy-fastjsonschema.*]
1414
ignore_missing_imports = True
15-
[mypy-mlflow.store.tracking.file_store.*]
16-
ignore_missing_imports = True
1715
[mypy-py4j.protocol.*]
1816
ignore_missing_imports = True
1917
[mypy-usaddress.*]

setup.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,11 @@
5858
"chardet",
5959
"slack-sdk>=3.22.0",
6060
"smart-open[s3]>=6.3.0",
61-
"mlflow-skinny==2.17.2",
6261
"sqlalchemy>=1.4.37",
6362
"sqlparse>=0.5.3",
6463
"bounded-pool-executor>=0.0.3",
6564
"fastjsonschema>=2.18.0",
66-
"helix.fhir.client.sdk>=3.0.38",
65+
"helix.fhir.client.sdk>=4.1.0",
6766
"opensearch-py[async]>=2.6.0",
6867
"pyathena>2.14.0",
6968
"spark-nlp>=4.2.3",
@@ -76,21 +75,12 @@
7675
"pydantic>=2.8.2",
7776
"motor[snappy,zstd]>=3.5.1",
7877
"dataclasses-json>=0.6.7",
78+
"helixtelemetry>=1.0.3",
7979
"opentelemetry-api>=1.30.0",
8080
"opentelemetry-sdk>=1.30.0",
8181
"opentelemetry-exporter-otlp>=1.30.0",
82-
"opentelemetry-instrumentation-system-metrics>=0.51b0",
83-
"opentelemetry-instrumentation-logging>=0.51b0",
84-
"opentelemetry-instrumentation-pymysql>=0.51b0",
85-
"opentelemetry-instrumentation-pymongo>=0.51b0",
86-
"opentelemetry-instrumentation-elasticsearch>=0.51b0",
87-
"opentelemetry-instrumentation-boto>=0.51b0",
88-
"opentelemetry-instrumentation-botocore>=0.51b0",
89-
"opentelemetry-instrumentation-httpx>=0.51b0",
90-
"opentelemetry-instrumentation-asyncio>=0.51b0",
9182
"opentelemetry-instrumentation-aiohttp-client>=0.51b0",
92-
"opentelemetry-instrumentation-confluent-kafka>=0.51b0",
93-
"opentelemetry-instrumentation-requests>=0.51b0",
83+
"msgpack>=1.1.0",
9484
],
9585
classifiers=[
9686
"Development Status :: 4 - Beta",

spark.Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ ENV CLASSPATH=/SparkpipelineFramework/jars:$CLASSPATH
88
# remove the older version of entrypoints with apt-get because that is how it was installed
99
RUN apt-get remove python3-entrypoints -y
1010

11+
# remove python3.10 stuff
12+
RUN rm -rf /usr/local/lib/python3.10
13+
1114
COPY Pipfile* /SparkpipelineFramework/
1215
WORKDIR /SparkpipelineFramework
1316

spark_pipeline_framework/mixins/telemetry_parent_mixin.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from typing import Optional
22

3-
from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
4-
TelemetryParent,
5-
)
3+
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent
64

75

86
class TelemetryParentMixin:

spark_pipeline_framework/pipelines/framework_pipeline.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22
import os
33
from typing import Any, Dict, List, Union, Optional, Mapping
44

5+
from helixtelemetry.telemetry.context.telemetry_context import TelemetryContext
6+
from helixtelemetry.telemetry.factory.telemetry_factory import TelemetryFactory
7+
from helixtelemetry.telemetry.providers.null_telemetry import NullTelemetry
8+
from helixtelemetry.telemetry.providers.open_telemetry import OpenTelemetry
9+
from helixtelemetry.telemetry.spans.telemetry_span_creator import TelemetrySpanCreator
10+
from helixtelemetry.telemetry.spans.telemetry_span_wrapper import TelemetrySpanWrapper
11+
from helixtelemetry.telemetry.structures.telemetry_attribute_value import (
12+
TelemetryAttributeValue,
13+
)
14+
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent
515
from pyspark.ml.base import Transformer
616
from pyspark.sql.dataframe import DataFrame
717

@@ -19,27 +29,6 @@
1929
from spark_pipeline_framework.utilities.async_helper.v1.async_helper import AsyncHelper
2030
from spark_pipeline_framework.utilities.class_helpers import ClassHelpers
2131
from spark_pipeline_framework.utilities.pipeline_helper import create_steps
22-
from spark_pipeline_framework.utilities.telemetry.telemetry_attribute_value import (
23-
TelemetryAttributeValue,
24-
)
25-
from spark_pipeline_framework.utilities.telemetry.telemetry_context import (
26-
TelemetryContext,
27-
)
28-
from spark_pipeline_framework.utilities.telemetry.telemetry_factory import (
29-
TelemetryFactory,
30-
)
31-
from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
32-
TelemetryParent,
33-
)
34-
from spark_pipeline_framework.utilities.telemetry.telemetry_provider import (
35-
TelemetryProvider,
36-
)
37-
from spark_pipeline_framework.utilities.telemetry.telemetry_span_creator import (
38-
TelemetrySpanCreator,
39-
)
40-
from spark_pipeline_framework.utilities.telemetry.telemetry_span_wrapper import (
41-
TelemetrySpanWrapper,
42-
)
4332

4433

4534
class FrameworkPipeline(Transformer, LoopIdMixin, TelemetryParentMixin):
@@ -86,9 +75,9 @@ def __init__(
8675
telemetry_context
8776
or TelemetryContext(
8877
provider=(
89-
TelemetryProvider.OPEN_TELEMETRY
78+
OpenTelemetry.telemetry_provider
9079
if self.telemetry_enable
91-
else TelemetryProvider.NULL
80+
else NullTelemetry.telemetry_provider
9281
),
9382
service_name=os.getenv(
9483
"OTEL_SERVICE_NAME", "helix-pipelines"

spark_pipeline_framework/pipelines/v2/framework_pipeline.py

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,17 @@
22
import os
33
from typing import Any, Dict, List, Optional, Union, Mapping
44

5-
# noinspection PyPackageRequirements
6-
from mlflow.entities import RunStatus
5+
from helixtelemetry.telemetry.context.telemetry_context import TelemetryContext
6+
from helixtelemetry.telemetry.factory.telemetry_factory import TelemetryFactory
7+
from helixtelemetry.telemetry.providers.null_telemetry import NullTelemetry
8+
from helixtelemetry.telemetry.providers.open_telemetry import OpenTelemetry
9+
from helixtelemetry.telemetry.spans.telemetry_span_creator import TelemetrySpanCreator
10+
from helixtelemetry.telemetry.spans.telemetry_span_wrapper import TelemetrySpanWrapper
11+
from helixtelemetry.telemetry.structures.telemetry_attribute_value import (
12+
TelemetryAttributeValue,
13+
)
14+
from helixtelemetry.telemetry.structures.telemetry_parent import TelemetryParent
15+
716
from pyspark.ml.base import Transformer
817
from pyspark.sql.dataframe import DataFrame
918

@@ -31,27 +40,6 @@
3140
from spark_pipeline_framework.utilities.spark_data_frame_helpers import (
3241
spark_list_catalog_table_names,
3342
)
34-
from spark_pipeline_framework.utilities.telemetry.telemetry_attribute_value import (
35-
TelemetryAttributeValue,
36-
)
37-
from spark_pipeline_framework.utilities.telemetry.telemetry_context import (
38-
TelemetryContext,
39-
)
40-
from spark_pipeline_framework.utilities.telemetry.telemetry_factory import (
41-
TelemetryFactory,
42-
)
43-
from spark_pipeline_framework.utilities.telemetry.telemetry_parent import (
44-
TelemetryParent,
45-
)
46-
from spark_pipeline_framework.utilities.telemetry.telemetry_provider import (
47-
TelemetryProvider,
48-
)
49-
from spark_pipeline_framework.utilities.telemetry.telemetry_span_creator import (
50-
TelemetrySpanCreator,
51-
)
52-
from spark_pipeline_framework.utilities.telemetry.telemetry_span_wrapper import (
53-
TelemetrySpanWrapper,
54-
)
5543

5644

5745
class FrameworkPipeline(Transformer, LoopIdMixin, TelemetryParentMixin):
@@ -117,9 +105,9 @@ def __init__(
117105
telemetry_context
118106
or TelemetryContext(
119107
provider=(
120-
TelemetryProvider.OPEN_TELEMETRY
108+
OpenTelemetry.telemetry_provider
121109
if self.telemetry_enable
122-
else TelemetryProvider.NULL
110+
else NullTelemetry.telemetry_provider
123111
),
124112
service_name=os.getenv(
125113
"OTEL_SERVICE_NAME", "helix-pipelines"
@@ -232,10 +220,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
232220
telemetry_parent=transformer_span.create_child_telemetry_parent()
233221
)
234222

235-
self.progress_logger.start_mlflow_run(
236-
run_name=stage_name, is_nested=True
237-
)
238-
239223
with ProgressLogMetric(
240224
progress_logger=self.progress_logger,
241225
name=str(stage_name) or "unknown",
@@ -253,7 +237,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
253237
pipeline_name,
254238
event_text=f"Finished pipeline step {stage_name}",
255239
)
256-
self.progress_logger.end_mlflow_run()
257240
except Exception as e:
258241
logger.error(
259242
f"!!!!!!!!!!!!! pipeline [{pipeline_name}] transformer "
@@ -285,8 +268,6 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
285268
event_text=str(e),
286269
ex=e,
287270
)
288-
self.progress_logger.end_mlflow_run(status=RunStatus.FAILED) # type: ignore
289-
290271
raise e
291272

292273
self.progress_logger.log_event(

0 commit comments

Comments
 (0)