Skip to content

Improve support for streaming #326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 179 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 131 commits
Commits
Show all changes
179 commits
Select commit Hold shift + click to select a range
e295162
add test for streaming from fhir_receiver
imranq2 Jul 31, 2024
c7d5976
add test for streaming from fhir_receiver
imranq2 Jul 31, 2024
6ce4baf
switch to generator
imranq2 Jul 31, 2024
a2af132
update helix.fhir.client.sdk
imranq2 Aug 2, 2024
19fcf1d
update helix.fhir.client.sdk
imranq2 Aug 2, 2024
fe07518
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
9bce79e
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
78753f7
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
96e201f
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
1e76f58
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
b2a3a51
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
644a271
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
b336496
got test working
imranq2 Aug 2, 2024
a76a24d
got test working
imranq2 Aug 2, 2024
ff6eb43
add comments
imranq2 Aug 2, 2024
a526734
add comments
imranq2 Aug 2, 2024
82d25d8
fix tests
imranq2 Aug 2, 2024
7960fba
added streaming tests
imranq2 Aug 2, 2024
bf85614
added streaming tests
imranq2 Aug 2, 2024
4f9a62a
switch to ndjson test file
imranq2 Aug 2, 2024
ee043a7
switch to ndjson test file
imranq2 Aug 2, 2024
b80961c
switch to ndjson test file
imranq2 Aug 5, 2024
5190b77
update helix.fhir.client.sdk
imranq2 Aug 5, 2024
19366c9
add try catch around merge
imranq2 Aug 5, 2024
a0217f0
add try catch around merge
imranq2 Aug 5, 2024
0e6a534
add try catch around merge
imranq2 Aug 5, 2024
6f2fbee
generate dict properly
imranq2 Aug 5, 2024
9d14ba7
convert issue to string
imranq2 Aug 5, 2024
d3d2432
convert issue to string
imranq2 Aug 6, 2024
4a76bd4
support passing in source_view and schema
imranq2 Aug 6, 2024
839e438
support passing in source_view and schema
imranq2 Aug 6, 2024
2842ba2
change to reading as text
imranq2 Aug 6, 2024
0a0cbce
change to reading as text
imranq2 Aug 6, 2024
ce583df
change to reading as text
imranq2 Aug 6, 2024
41fe1d3
change to reading as text
imranq2 Aug 6, 2024
174dfd9
add check for "desc" in exception
imranq2 Aug 6, 2024
1e67098
return error in ElasticSearchResult
imranq2 Aug 6, 2024
09466fa
return error in ElasticSearchResult
imranq2 Aug 6, 2024
b35b45a
add address_standardizer
imranq2 Aug 6, 2024
00bf547
update package
imranq2 Aug 6, 2024
acf6202
fix tests
imranq2 Aug 6, 2024
876cf7f
fix tests
imranq2 Aug 6, 2024
b697556
refactor address_standardization.py
imranq2 Aug 6, 2024
c0ac3a9
refactor address_standardization.py
imranq2 Aug 6, 2024
28f0828
refactor address_standardization.py
imranq2 Aug 6, 2024
a8339b4
refactor address_standardization.py
imranq2 Aug 6, 2024
e6f558c
add dynamic_class_loader.py
imranq2 Aug 6, 2024
f4a757e
add dynamic_class_loader.py
imranq2 Aug 6, 2024
2381d7a
add dynamic_class_loader.py
imranq2 Aug 6, 2024
c314440
add dynamic_class_loader.py
imranq2 Aug 6, 2024
a728436
change to StdAddress
imranq2 Aug 6, 2024
034ccac
change to StdAddress
imranq2 Aug 6, 2024
438c092
change to StdAddress
imranq2 Aug 6, 2024
c9c7e00
refactor address_standardization.py
imranq2 Aug 6, 2024
2941240
revert to previous address standardized
imranq2 Aug 6, 2024
b200b8a
add test for census standardize
imranq2 Aug 6, 2024
2311749
use bulk api
imranq2 Aug 6, 2024
4229ba1
fixes from testing in databricks
imranq2 Aug 7, 2024
5151fd2
fixes from testing in databricks
imranq2 Aug 7, 2024
cc51038
fixes from testing in databricks
imranq2 Aug 7, 2024
0e0c78b
fixes from testing in databricks
imranq2 Aug 7, 2024
0fd778b
add useaddress package
imranq2 Aug 7, 2024
0a90781
parse address using usaddress
imranq2 Aug 7, 2024
79ca1e6
use AsyncPandasColumnUDF
imranq2 Aug 7, 2024
8c11538
create v2 of helix.geolocation
imranq2 Aug 7, 2024
4b53c70
convert census_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
287d72d
convert census_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
2faebbc
convert melissa_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
9eaef0f
add async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
a162fb3
convert fhir_receiver to use async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
016568a
convert fhir_sender to use async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
1eace1f
add back error checking
imranq2 Aug 7, 2024
aad6586
add back error checking
imranq2 Aug 7, 2024
9fcb08b
convert fhir_receiver_processor.py to async
imranq2 Aug 7, 2024
fd29060
convert fhir_receiver_processor.py to async
imranq2 Aug 7, 2024
a8c8232
use AsyncHelper.run_in_event_loop
imranq2 Aug 7, 2024
430c40f
fix elasticsearch_processor test
imranq2 Aug 7, 2024
c1fda65
fix fhir_receiver_processor.py
imranq2 Aug 7, 2024
86499c6
fix fhir_sender_processor.py
imranq2 Aug 7, 2024
3b8c8bf
fix fhir_sender_processor.py
imranq2 Aug 7, 2024
c99b6b8
fix infinite loop
imranq2 Aug 7, 2024
b0ea1f5
fix infinite loop
imranq2 Aug 7, 2024
f08fd0f
remove census_standardizing_vendor.py from v1
imranq2 Aug 7, 2024
9372892
add fhir_helpers_async
imranq2 Aug 7, 2024
4b933c7
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
49c38a4
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
29c189e
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
5901925
fix test
imranq2 Aug 7, 2024
cea07f8
add aiohttp package
imranq2 Aug 7, 2024
7fb7bdd
pass well_known_url
imranq2 Aug 7, 2024
1ccc86c
get access token async
imranq2 Aug 7, 2024
c4e3d2a
remove print statements
imranq2 Aug 7, 2024
320322b
add tests using real fhir server
imranq2 Aug 8, 2024
9cb7dc9
add tests using real fhir server
imranq2 Aug 8, 2024
b5a6ee8
add tests using real fhir server
imranq2 Aug 8, 2024
1fe199d
use transform_async
imranq2 Aug 8, 2024
ec8f875
add transform_async to framework_transformer.py
imranq2 Aug 8, 2024
a083093
rename AsyncHelper.run()
imranq2 Aug 8, 2024
e75bfac
change elasticsearch_sender.py to use async
imranq2 Aug 8, 2024
d8f7a18
change elasticsearch_sender.py to use async
imranq2 Aug 8, 2024
c32fc92
try another technique for waiting on async function
imranq2 Aug 8, 2024
58a579e
get a working solution for nested async
imranq2 Aug 8, 2024
4cfceff
get a working solution for nested async
imranq2 Aug 8, 2024
7ccc3bd
add better error message in AsyncHelper
imranq2 Aug 8, 2024
d064e25
use async in fhir_sender
imranq2 Aug 8, 2024
5213b54
update github runners
imranq2 Aug 8, 2024
8f32175
add print statements in test
imranq2 Aug 8, 2024
2a613ca
update docker-compose.yml
imranq2 Aug 8, 2024
a2889ba
update packages
imranq2 Aug 8, 2024
431c322
use ES async api
imranq2 Aug 8, 2024
96a37fd
minor
imranq2 Aug 8, 2024
5995b20
make vendor_specific_to_std be an abstract method
imranq2 Aug 8, 2024
36263e0
make other methods abstract
imranq2 Aug 8, 2024
b41a46e
rework address_standardization.py to use generic types
imranq2 Aug 8, 2024
ea56fc2
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
9d12147
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
c9c7014
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
e584895
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
cb0d892
fix data_class_loader.py
imranq2 Aug 9, 2024
0e1fc6e
switch to Pydantic
imranq2 Aug 9, 2024
72c2f78
switch to Pydantic
imranq2 Aug 9, 2024
3f329f6
fixes after switch to Pydantic
imranq2 Aug 9, 2024
e66f283
fixes after switch to Pydantic
imranq2 Aug 9, 2024
6c29d44
fix test
imranq2 Aug 9, 2024
72e52aa
fix test
imranq2 Aug 9, 2024
848b130
fix test
imranq2 Aug 9, 2024
8b4ceac
fix test
imranq2 Aug 9, 2024
bc0e89f
use https
imranq2 Aug 9, 2024
4062a6e
fix graph test
imranq2 Aug 9, 2024
60b6899
fix graph test
imranq2 Aug 9, 2024
bcf95d0
create GetBatchError for errors in batch
imranq2 Aug 9, 2024
f5000bd
segt use_data_streaming to be the default in fhir receiver v2
imranq2 Aug 9, 2024
35ff3da
separate error and non error resources
imranq2 Aug 9, 2024
276ae0f
separate error and non error resources
imranq2 Aug 9, 2024
ff0bee2
separate error and non error resources
imranq2 Aug 9, 2024
0546224
separate error and non error resources
imranq2 Aug 9, 2024
5f86a80
separate error and non error resources
imranq2 Aug 9, 2024
62a1d39
fix test
imranq2 Aug 9, 2024
4b635bb
fix test
imranq2 Aug 10, 2024
20226dd
update fhir client sdk
imranq2 Aug 10, 2024
dec23d0
add test for pandas dataframe udf
imranq2 Aug 10, 2024
1863395
add test for pandas dataframe udf
imranq2 Aug 10, 2024
3ace211
add test for pandas dataframe udf
imranq2 Aug 10, 2024
2886947
add list of ids
imranq2 Aug 10, 2024
425c27e
add list of ids
imranq2 Aug 10, 2024
5af1f36
add list of ids
imranq2 Aug 10, 2024
f83624a
add list of ids
imranq2 Aug 10, 2024
24809a5
add list of ids
imranq2 Aug 11, 2024
cfcfa66
pass partition and chunk index to panda functions
imranq2 Aug 11, 2024
b1e5bff
pass partition and chunk index to panda functions
imranq2 Aug 11, 2024
3687ff3
rename HandlePandasBatchFunction
imranq2 Aug 11, 2024
68d77c2
fix chunk_index
imranq2 Aug 11, 2024
fa8d432
print range
imranq2 Aug 11, 2024
ba6ee78
fix batches of size
imranq2 Aug 11, 2024
a79b874
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
b164d82
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
1884ba9
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
8b0e24e
fix column test
imranq2 Aug 11, 2024
cc84e46
rename column class
imranq2 Aug 11, 2024
bd1aa79
make return type of abstract base class generic
imranq2 Aug 11, 2024
737b837
add type aliases
imranq2 Aug 11, 2024
ef5ffae
add type aliases
imranq2 Aug 11, 2024
f1bcda8
add type aliases
imranq2 Aug 11, 2024
9bcc7cd
add type aliases
imranq2 Aug 11, 2024
467d7dc
fix test
imranq2 Aug 11, 2024
b09fd34
if LOGLEVEL is set to DEBUG then write out partitions as we process them
imranq2 Aug 11, 2024
f5f78ff
show hostname when available
imranq2 Aug 12, 2024
d119e91
pass timeout for elastic search calls
imranq2 Aug 12, 2024
a80cd49
pass timeout for elastic search calls
imranq2 Aug 12, 2024
fa38794
add documentation
imranq2 Aug 12, 2024
4796568
share code for repartition into SparkPartitionHelper
imranq2 Aug 12, 2024
0dde707
add repartition code to address_standardization.py
imranq2 Aug 12, 2024
2af2a2d
add v2 of framework_partitioner.py
imranq2 Aug 12, 2024
86d3264
remove partitioning from fhir_sender, fhir_receiver and elasticsearch…
imranq2 Aug 12, 2024
f418cb0
remove partitioning from fhir_sender, fhir_receiver and elasticsearch…
imranq2 Aug 12, 2024
58a2d95
fix warning about ClosedConnection
imranq2 Aug 12, 2024
ccebb9a
fix warning about ClosedConnection
imranq2 Aug 12, 2024
94a3894
add calculate_automatically mode to framework_partitioner.py
imranq2 Aug 12, 2024
a942070
fix test
imranq2 Aug 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ jobs:

steps:
# Checks-out your repository
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ run-pre-commit: setup-pre-commit
update: down Pipfile.lock setup-pre-commit ## Updates all the packages using Pipfile
docker compose run --rm --name spf_pipenv dev pipenv sync --dev && \
make devdocker && \
make pipenv-setup
make pipenv-setup && \
make up

.PHONY:tests
tests:
Expand Down
24 changes: 17 additions & 7 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ bounded-pool-executor = ">=0.0.3"
# fastjsonschema is needed for validating JSON
fastjsonschema= ">=2.18.0"
# helix.fhir.client.sdk is needed for interacting with FHIR servers
"helix.fhir.client.sdk" = ">=2.0.8"
"helix.fhir.client.sdk" = ">=2.0.15"
# opensearch-py is needed for interacting with OpenSearch
opensearch-py= ">=1.1.0"
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
# pyathena is needed for interacting with Athena in AWS
pyathena = ">2.14.0"
# spark-nlp is needed for natural language processing
Expand All @@ -60,6 +60,14 @@ pandas = ">=2.2.2"
numexpr = ">=2.8.4"
# bottleneck is needed for working with numerical data. pandas requires this minimum version.
bottleneck = ">=1.3.6"
# structlog is needed for structured logging
structlog = ">=23.1.0"
# usaddress is needed for parsing street addresses
"usaddress"=">=0.5.10" # for parsing street addresses
# usaddress-scourgify is needed for normalizing addresses
"usaddress-scourgify"=">=0.6.0" # for normalizing addresses
# aiohttp is needed for making HTTP requests asynchronously
aiohttp = ">=3"

[dev-packages]
# setuptools is needed for building the package
Expand All @@ -73,11 +81,11 @@ pre-commit=">=3.7.1"
# autoflake is needed for removing unused imports
autoflake=">=2.3.1"
# mypy is needed for type checking
mypy = ">=1.10.1"
mypy = ">=1.11.1"
# pytest is needed for running tests
pytest = ">=8.2.2"
pytest = ">=8.3.2"
# black is needed for formatting code
black = ">=24.4.2"
black = ">=24.8.0"
# pygments is needed for syntax highlighting
pygments=">=2.8.1" # not directly required, pinned by Snyk to avoid a vulnerability
# Sphinx is needed for generating documentation
Expand All @@ -101,13 +109,13 @@ sparkdataframecomparer = ">=2.0.2"
# pytest-ayncio is needed for running async tests
pytest-asyncio = ">=0.23.8"
# helix-mockserver-client is needed for mocking servers
helix-mockserver-client=">=1.2.1"
helix-mockserver-client=">=1.3.0"
# sparkfhirschemas is needed for FHIR schemas
sparkfhirschemas = ">=1.0.17"
# types-boto3 is needed for type hints for boto3
types-boto3 = ">=1.0.2"
# moto is needed for mocking AWS services
moto = { extras = ['all'], version = ">=5.0.11" }
moto = { extras = ['all'], version = ">=5.0.12" }
# types-requests is needed for type hints for requests
types-requests=">=2.31.0"
# types-PyMySQL is needed for type hints for PyMySQL
Expand All @@ -120,6 +128,8 @@ types-python-dateutil=">=2.8.19.14"
pandas-stubs=">=2.2.2"
# types-pytz is needed for type hints for pytz
types-pytz=">=2024.1.0"
# pydantic is needed for data class loading
pydantic=">=2.8.2"

# These dependencies are required for pipenv-setup. They conflict with ones above, so we install these
# only when running pipenv-setup
Expand Down
1,200 changes: 673 additions & 527 deletions Pipfile.lock

Large diffs are not rendered by default.

22 changes: 8 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
version: '3.5'
services:
dev:
depends_on:
- fhir
- warehouse
- elasticsearch
user: root
build:
dockerfile: spark.Dockerfile
Expand All @@ -15,14 +19,13 @@ services:
volumes:
- ./:/SparkpipelineFramework/
# - ~/.local/share/virtualenvs:/root/.local/share/virtualenvs:rw
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.7/dist-packages/helix_fhir_client_sdk
# - ../helix.fhir.client.sdk/helix_fhir_client_sdk:/usr/local/lib/python3.10/dist-packages/helix_fhir_client_sdk
# - ../mockserver_client/mockserver_client:/usr/local/lib/python3.10/dist-packages/mockserver_client
container_name: spf_dev
working_dir: /SparkpipelineFramework

warehouse:
image: mysql:8.0.39
volumes:
- mysql_data:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: root_password
MYSQL_DATABASE: fhir_rpt
Expand Down Expand Up @@ -51,7 +54,8 @@ services:
fhir:
depends_on:
- mongo
image: imranq2/node-fhir-server-mongo:5.3.8
- keycloak
image: imranq2/node-fhir-server-mongo:5.3.16
# To use local fhir code, comment above line and uncomment below
# build:
# dockerfile: Dockerfile
Expand Down Expand Up @@ -110,8 +114,6 @@ services:
- '27017:27017'
environment:
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- mongo_data:/data/db
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo mongo:27017/test --quiet

Expand All @@ -138,9 +140,6 @@ services:
nofile:
soft: 65536 # maximum number of open files for the Elasticsearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- es_data:/usr/share/opensearch/data:delegated
# - ./conf/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:cached
ports:
- '9200:9200'
- '9600:9600' # required for Performance Analyzer
Expand Down Expand Up @@ -194,8 +193,3 @@ services:
command: ["start-dev", "--import-realm", "--verbose"]
volumes:
- ./keycloak-config/realm-import.json:/opt/keycloak/data/import/realm-import.json

volumes:
mysql_data:
mongo_data:
es_data:
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[pytest]
norecursedirs = tests/**/temp/*
addopts =
; all discovered async tests are considered asyncio-driven
asyncio_mode = auto
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
[tool:pytest]
addopts =
[mypy]
python_version = 3.10
warn_return_any = True
Expand All @@ -18,6 +16,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-py4j.protocol.*]
ignore_missing_imports = True
[mypy-usaddress.*]
ignore_missing_imports = True
[flake8]
ignore = E501, W503, W504, E126, E123
exclude = venv/
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,19 @@ def violation(operation: Any, *args: Any, **_: Any) -> None:
"sqlparse>=0.4.4",
"bounded-pool-executor>=0.0.3",
"fastjsonschema>=2.18.0",
"helix.fhir.client.sdk>=2.0.8",
"opensearch-py>=1.1.0",
"helix.fhir.client.sdk>=2.0.15",
"opensearch-py[async]>=2.6.0",
"pyathena>2.14.0",
"spark-nlp>=4.2.3",
"pymongo>=4.8.0",
"more-itertools>=9.1.0",
"pandas>=2.2.2",
"numexpr>=2.8.4",
"bottleneck>=1.3.6",
"structlog>=23.1.0",
"usaddress>=0.5.10",
"usaddress-scourgify>=0.6.0",
"aiohttp>=3",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand Down
Loading