Skip to content

Commit 9e7df50

Browse files
authored
Merge pull request #326 from icanbwell/add-test-for-streaming
Improve support for streaming
2 parents 22035dc + a942070 commit 9e7df50

File tree

168 files changed

+13211
-1532
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

168 files changed

+13211
-1532
lines changed

.dockerignore

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
.DS_Store
2+
.github
3+
.idea
4+
.mypy_cache
5+
.pytest_cache
6+
ADR
7+
notebooks
8+
tmp
9+
docsrc
10+
docs
11+
spf_tests
12+
tests
13+
*/test/*
14+
*.yml
15+
*.md
16+
metastore_db
17+
18+
.git
19+
.svn
20+
.cvs
21+
.hg
22+
!README*.md
23+
24+
.env
25+
spark-events

.github/workflows/build_and_test.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ jobs:
1515

1616
steps:
1717
# Checks-out your repository
18-
- uses: actions/checkout@v3
19-
- uses: actions/setup-python@v4
18+
- uses: actions/checkout@v4
19+
- uses: actions/setup-python@v5
2020
with:
2121
python-version: '3.10'
2222

.github/workflows/python-publish.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ jobs:
1313
runs-on: ubuntu-latest
1414

1515
steps:
16-
- uses: actions/checkout@v3
16+
- uses: actions/checkout@v4
1717
- name: Set up Python
18-
uses: actions/setup-python@v4
18+
uses: actions/setup-python@v5
1919
with:
2020
python-version: '3.10'
2121
- name: Install dependencies

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ run-pre-commit: setup-pre-commit
5656
update: down Pipfile.lock setup-pre-commit ## Updates all the packages using Pipfile
5757
docker compose run --rm --name spf_pipenv dev pipenv sync --dev && \
5858
make devdocker && \
59-
make pipenv-setup
59+
make pipenv-setup && \
60+
make up
6061

6162
.PHONY:tests
6263
tests:

Pipfile

+17-7
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ bounded-pool-executor = ">=0.0.3"
4343
# fastjsonschema is needed for validating JSON
4444
fastjsonschema= ">=2.18.0"
4545
# helix.fhir.client.sdk is needed for interacting with FHIR servers
46-
"helix.fhir.client.sdk" = ">=2.0.8"
46+
"helix.fhir.client.sdk" = ">=2.0.16"
4747
# opensearch-py is needed for interacting with OpenSearch
48-
opensearch-py= ">=1.1.0"
48+
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
4949
# pyathena is needed for interacting with Athena in AWS
5050
pyathena = ">2.14.0"
5151
# spark-nlp is needed for natural language processing
@@ -60,6 +60,14 @@ pandas = ">=2.2.2"
6060
numexpr = ">=2.8.4"
6161
# bottleneck is needed for working with numerical data. pandas requires this minimum version.
6262
bottleneck = ">=1.3.6"
63+
# structlog is needed for structured logging
64+
structlog = ">=23.1.0"
65+
# usaddress is needed for parsing street addresses
66+
"usaddress"=">=0.5.10" # for parsing street addresses
67+
# usaddress-scourgify is needed for normalizing addresses
68+
"usaddress-scourgify"=">=0.6.0" # for normalizing addresses
69+
# aiohttp is needed for making HTTP requests asynchronously
70+
aiohttp = ">=3"
6371

6472
[dev-packages]
6573
# setuptools is needed for building the package
@@ -73,11 +81,11 @@ pre-commit=">=3.7.1"
7381
# autoflake is needed for removing unused imports
7482
autoflake=">=2.3.1"
7583
# mypy is needed for type checking
76-
mypy = ">=1.10.1"
84+
mypy = ">=1.11.1"
7785
# pytest is needed for running tests
78-
pytest = ">=8.2.2"
86+
pytest = ">=8.3.2"
7987
# black is needed for formatting code
80-
black = ">=24.4.2"
88+
black = ">=24.8.0"
8189
# pygments is needed for syntax highlighting
8290
pygments=">=2.8.1" # not directly required, pinned by Snyk to avoid a vulnerability
8391
# Sphinx is needed for generating documentation
@@ -101,13 +109,13 @@ sparkdataframecomparer = ">=2.0.2"
101109
# pytest-ayncio is needed for running async tests
102110
pytest-asyncio = ">=0.23.8"
103111
# helix-mockserver-client is needed for mocking servers
104-
helix-mockserver-client=">=1.2.1"
112+
helix-mockserver-client=">=1.3.0"
105113
# sparkfhirschemas is needed for FHIR schemas
106114
sparkfhirschemas = ">=1.0.17"
107115
# types-boto3 is needed for type hints for boto3
108116
types-boto3 = ">=1.0.2"
109117
# moto is needed for mocking AWS services
110-
moto = { extras = ['all'], version = ">=5.0.11" }
118+
moto = { extras = ['all'], version = ">=5.0.12" }
111119
# types-requests is needed for type hints for requests
112120
types-requests=">=2.31.0"
113121
# types-PyMySQL is needed for type hints for PyMySQL
@@ -120,6 +128,8 @@ types-python-dateutil=">=2.8.19.14"
120128
pandas-stubs=">=2.2.2"
121129
# types-pytz is needed for type hints for pytz
122130
types-pytz=">=2024.1.0"
131+
# pydantic is needed for data class loading
132+
pydantic=">=2.8.2"
123133

124134
# These dependencies are required for pipenv-setup. They conflict with ones above, so we install these
125135
# only when running pipenv-setup

Pipfile.lock

+670-524
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -252,5 +252,9 @@ This will install Java, Scala, Spark and other packages
252252

253253
# Publishing a new package
254254
1. Create a new release
255-
3. The GitHub Action should automatically kick in and publish the package
256-
4. You can see the status in the Actions tab
255+
2. The GitHub Action should automatically kick in and publish the package
256+
3. You can see the status in the Actions tab
257+
258+
259+
# Asynchronous Processing
260+
[Asynchronous Processing](asynchronous.md)

asynchronous.md

+231
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
# Asynchronous Processing
2+
3+
## Introduction
4+
Asynchronous processing is a form of parallel processing that allows a system to handle multiple requests at the same time. This is particularly useful when a system needs to wait on other systems to respond, such as reading from a file, accessing a web server (e.g. FHIR or Elasticsearch) or making a network request.
5+
6+
Instead of blocking the current process, we can "await" the response from the other system and continue processing other requests.
7+
8+
This is the `async` and `await` pattern in Python (and other programming languages). The functions are defined with an "async" prefix and then we use "await" on calls to external systems.
9+
10+
## How use Asynchronous Processing with Spark
11+
Apache Spark is a distributed computing system that is designed to process large datasets quickly.
12+
Each partition of a data frame can be processed in parallel on different nodes in a cluster.
13+
14+
However if we are not using async calls then each worker node can get blocked waiting for the external system to respond for the call we make for that partition.
15+
16+
Instead, what we want is that each worker can make multiple async calls at the same time and then once all the async calls have returned then that partition is finished and the worker node can work on a different partition.
17+
18+
19+
## Using async and await with Spark
20+
In this framework, we provide a few helper classes that allow you to easily use async functions with Spark data frames.
21+
These functions can be used with the `mapInPandas()` function in Spark or as Pandas UDF (User Defined Functions) in the `withColumn()` or `select()` function.
22+
23+
### Case 1: When you want to read a whole data frame and return a different data frame
24+
Examples of this include when you read FHIR resources from a source dataframe, send those to the FHIR server and return a data frame that contains the responses from the FHIR server.
25+
In this case your goal is to create a new data frame that has the same number of rows as the input data frame.
26+
27+
For this case, you can use the `AsyncPandasDataFrameUDF` class like so:
28+
```python
29+
from spark_pipeline_framework.utilities.async_pandas_udf.v1.async_pandas_dataframe_udf import (
30+
AsyncPandasDataFrameUDF,
31+
)
32+
from pyspark.sql import SparkSession, DataFrame
33+
from spark_pipeline_framework.utilities.async_pandas_udf.v1.function_types import (
34+
HandlePandasDataFrameBatchFunction,
35+
)
36+
from typing import (
37+
List,
38+
Dict,
39+
Any,
40+
Optional,
41+
AsyncGenerator,
42+
cast,
43+
Iterable,
44+
Tuple,
45+
Generator,
46+
)
47+
import dataclasses
48+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
49+
50+
@dataclasses.dataclass
51+
class MyParameters:
52+
log_level: str
53+
54+
55+
56+
def my_code():
57+
async def test_async(
58+
*,
59+
partition_index: int,
60+
chunk_index: int,
61+
chunk_input_range: range,
62+
input_values: List[Dict[str, Any]],
63+
parameters: Optional[MyParameters],
64+
) -> AsyncGenerator[Dict[str, Any], None]:
65+
# your async code here
66+
# yield a dict for each row in the input_values list
67+
yield {
68+
"name": "test"
69+
}
70+
71+
output_schema = StructType(
72+
[
73+
StructField("name", StringType(), True),
74+
]
75+
)
76+
result_df: DataFrame = source_df.mapInPandas(
77+
AsyncPandasDataFrameUDF(
78+
parameters=MyParameters(log_level="DEBUG"),
79+
async_func=cast(
80+
HandlePandasDataFrameBatchFunction[MyParameters], test_async
81+
),
82+
batch_size=2,
83+
).get_pandas_udf(),
84+
schema=output_schema,
85+
)
86+
```
87+
88+
### Case 2: When you want to read a single column (or set of columns) and want to append (or replace) a column in the same dataframe
89+
Example of this include reading the raw_address column of every row, calling a service to standardize the address and then adding the standardized address as a new column called standardized_address.
90+
91+
In this case, your goal is to add columns to an existing data frame and not replace the whole dataframe.
92+
93+
There are four kinds of column transformations:
94+
1. Struct column to struct column. Use `AsyncPandasStructColumnToStructColumnUDF` class.
95+
2. Struct column to scalar column. Use `AsyncPandasStructColumnToScalarColumnUDF` class.
96+
3. Scalar column to struct column. Use `AsyncPandasScalarColumnToStructColumnUDF` class.
97+
4. Scalar column to scalar column. Use `AsyncPandasScalarColumnToScalarColumnUDF` class.
98+
99+
For example for scalar column to scalar column transformation, you can use the `AsyncPandasScalarColumnToScalarColumnUDF` class like so:
100+
```python
101+
from spark_pipeline_framework.utilities.async_pandas_udf.v1.async_pandas_scalar_column_to_scalar_udf import (
102+
AsyncPandasScalarColumnToScalarColumnUDF,
103+
)
104+
from pyspark.sql import SparkSession, DataFrame
105+
from spark_pipeline_framework.utilities.async_pandas_udf.v1.function_types import (
106+
HandlePandasScalarToScalarBatchFunction,
107+
)
108+
from typing import (
109+
List,
110+
Dict,
111+
Any,
112+
Optional,
113+
AsyncGenerator,
114+
cast,
115+
)
116+
import dataclasses
117+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
118+
119+
@dataclasses.dataclass
120+
class MyParameters:
121+
log_level: str
122+
123+
def my_code():
124+
async def test_async(
125+
*,
126+
partition_index: int,
127+
chunk_index: int,
128+
chunk_input_range: range,
129+
input_values: List[Dict[str, Any]],
130+
parameters: Optional[MyParameters],
131+
) -> AsyncGenerator[Dict[str, Any], None]:
132+
# your async code here
133+
# yield a dict for each row in the input_values list
134+
yield {}
135+
136+
output_schema = StructType(
137+
[
138+
# your output schema here
139+
]
140+
)
141+
source_df: DataFrame = spark.createDataFrame(
142+
[
143+
# your data here
144+
],
145+
schema=StructType(
146+
[
147+
StructField("raw_address", StringType(), True),
148+
]
149+
),
150+
)
151+
result_df: DataFrame = df.withColumn(
152+
colName="processed_name",
153+
col=AsyncPandasScalarColumnToScalarColumnUDF(
154+
async_func=cast(
155+
HandlePandasScalarToScalarBatchFunction[MyParameters],
156+
test_async,
157+
),
158+
parameters=MyParameters(log_level="DEBUG"),
159+
batch_size=2,
160+
).get_pandas_udf(return_type=StringType())(source_df["name"]),
161+
)
162+
```
163+
164+
# Logging
165+
The partition_index and chunk_index are passed to the async function so that you can log them. This can be useful for debugging purposes. The log level is passed as a parameter to the async function so that you can control the log level.
166+
167+
In addition, you can use the SparkPartitionInformation class to get additional information about the partition, such as the number of rows in the partition and the partition index.
168+
169+
```python
170+
from spark_pipeline_framework.utilities.spark_partition_information.v1.spark_partition_information import (
171+
SparkPartitionInformation,
172+
)
173+
from typing import (
174+
List,
175+
Dict,
176+
Any,
177+
Optional,
178+
AsyncGenerator,
179+
cast,
180+
Iterable,
181+
Tuple,
182+
Generator,
183+
)
184+
185+
async def test_async(
186+
*,
187+
partition_index: int,
188+
chunk_index: int,
189+
chunk_input_range: range,
190+
input_values: List[Dict[str, Any]],
191+
parameters: Optional[MyParameters],
192+
) -> AsyncGenerator[Dict[str, Any], None]:
193+
spark_partition_information: SparkPartitionInformation = (
194+
SparkPartitionInformation.from_current_task_context(
195+
chunk_index=chunk_index,
196+
)
197+
)
198+
print(str(spark_partition_information))
199+
# your async code here
200+
# yield a dict for each row in the input_values list
201+
yield {}
202+
203+
204+
```
205+
206+
# Memory usage
207+
Typically, when you transform a data frame in Pyton code, Spark has to serialize the data frame and send it to the Python worker. And then it has to serialize the result and send it back to the JVM.
208+
209+
This can be memory intensive.
210+
211+
However, this framework uses Arrow (a columnar memory framework) so the data is shared between JVM and Python without needing to serialize and deserialize between Python and JVM.
212+
213+
# Use asyncio drivers for external systems
214+
To make this work you need to use an async driver for the external system you are calling. The synchronous drivers will block the worker node and prevent it from making other async calls.
215+
216+
For example for HTTP calls, you can use `aiohttp` library and for elastic search you can use `opensearch-py` library.
217+
218+
You can find a list of async drivers here: https://github.com/timofurrer/awesome-asyncio
219+
220+
# Controlling how big of a batch to send to the async function
221+
Set the `batch_size` parameter in the `AsyncPandasDataFrameUDF` or `AsyncPandasScalarColumnToScalarColumnUDF` class to control how many rows to send to the async function at a time.
222+
223+
The partition will be divided into chunks of `batch_size` rows and each chunk will be sent to the async function at the same time.
224+
225+
Hence you should set the `partition_size` to be as large as can fit in memory and then let this divide into batches of `batch_size` to send to async function at one time.
226+
227+
This will be most efficient since you'll have multiple async calls (one for each batch) waiting in each partition.
228+
229+
230+
# Summary
231+
By using these classes you can easily use async functions with Spark data frames. This can be useful when you need to perform tasks that are time-consuming, such as reading from a file, accessing a web server (e.g. FHIR or Elasticsearch) or making a network request. By using asynchronous processing, a system can handle multiple requests at the same time, which can improve performance and reduce latency.

0 commit comments

Comments
 (0)