Skip to content

Signal Documentation Coverage Endpoint #1584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b0934b5
first pass at coverage endpoint
nolangormley Jan 27, 2025
1a06e2e
added SQL to schemas
nolangormley Jan 27, 2025
55d13cb
added load table and function to recompute data
nolangormley Jan 27, 2025
16fe7b7
modified endpoint to use geo_sets, allowing multiple geos in filter
nolangormley Jan 27, 2025
85a972e
removed indexes from load table, fixed index declaration on main table
nolangormley Jan 27, 2025
c2baf02
added CLI and tests
nolangormley Jan 29, 2025
233ccd2
fixed sonar suggestions
nolangormley Jan 29, 2025
65c040a
more sonar issues
nolangormley Jan 29, 2025
3c404b2
ignore sonar test
aysim319 Jan 30, 2025
a5555a5
removed unessesary index rebuild, fixed indexes in ddl sql file
nolangormley Jan 30, 2025
31200bd
Merge branch 'sig_doc_coverage' of github.com:cmu-delphi/delphi-epida…
nolangormley Jan 30, 2025
26c6450
updated SQL to use transaction instead of load table
nolangormley Jan 31, 2025
1f26061
updated integration test, removed SQL alias
nolangormley Jan 31, 2025
a0f631f
adding alias back in
nolangormley Feb 1, 2025
8da0326
Update src/maintenance/coverage_crossref_updater.py
nolangormley Feb 1, 2025
8a46b84
Update src/acquisition/covidcast/database.py
nolangormley Feb 1, 2025
f0e60f0
Update src/acquisition/covidcast/database.py
nolangormley Feb 1, 2025
d9f9260
Update src/acquisition/covidcast/database.py
nolangormley Feb 1, 2025
38b9e7b
Update src/maintenance/coverage_crossref_updater.py
nolangormley Feb 1, 2025
59d0ec8
Update integrations/acquisition/covidcast/test_coverage_crossref_upda…
nolangormley Feb 1, 2025
0bf61b7
Update src/server/endpoints/covidcast.py
nolangormley Feb 3, 2025
8a59022
Update integrations/acquisition/covidcast/test_coverage_crossref_upda…
nolangormley Feb 3, 2025
d8c5efa
Merge branch 'sig_doc_coverage' of github.com:cmu-delphi/delphi-epida…
nolangormley Feb 3, 2025
a458730
added test coverage and fixed groupby
nolangormley Feb 3, 2025
62b7475
Update src/maintenance/coverage_crossref_updater.py
nolangormley Feb 3, 2025
269c108
Update src/maintenance/coverage_crossref_updater.py
nolangormley Feb 3, 2025
b0be91f
Update src/maintenance/coverage_crossref_updater.py
nolangormley Feb 3, 2025
3b137d8
Update integrations/acquisition/covidcast/test_coverage_crossref_upda…
nolangormley Feb 3, 2025
e50624c
Merge branch 'sig_doc_coverage' of github.com:cmu-delphi/delphi-epida…
nolangormley Feb 3, 2025
1604845
added some test cases
nolangormley Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions integrations/acquisition/covidcast/test_coverage_crossref_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Integration tests for the covidcast `geo_coverage` endpoint."""

# standard library
import json
import unittest

# third party
import mysql.connector
import requests

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets
import delphi.epidata.acquisition.covidcast.database as live
from delphi.epidata.maintenance.coverage_crossref_updater import main
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase

# use the local instance of the Epidata API
BASE_URL = 'http://delphi_web_epidata/epidata' # NOSONAR


class CoverageCrossrefTests(CovidcastBase):
"""Tests coverage crossref updater."""

def localSetUp(self):
"""Perform per-test setup."""
self._db._cursor.execute('TRUNCATE TABLE `coverage_crossref`')

@staticmethod
def _make_request(params=None):
if params is None:
params = {'geo': 'state:*'}
response = requests.get(f"{Epidata.BASE_URL}/covidcast/geo_coverage", params=params, auth=Epidata.auth)
response.raise_for_status()
return response.json()

def test_caching(self):
"""Populate, query, cache, query, and verify the cache."""

# insert dummy data
self._db._cursor.execute('''
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`)
VALUES
(42, 'src', 'sig');
''')
self._db._cursor.execute('''
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`)
VALUES
(96, 'state', 'pa'),
(97, 'state', 'wa');
''')
self._db._cursor.execute(f'''
INSERT INTO
`epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(15, 42, 96, 'day', 20200422,
123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(16, 42, 97, 'day', 20200422,
789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
''')
self._db.commit()

results = self._make_request()

# make sure the tables are empty
self.assertEqual(results, {
'result': -2,
'epidata': [],
'message': 'no results',
})

# update the coverage crossref table
main()

results = self._make_request()

# make sure the data was actually served
self.assertEqual(results, {
'result': 1,
'epidata': [{'signal': 'sig', 'source': 'src'}],
'message': 'success',
})

results = self._make_request(params = {'geo': 'hrr:*'})

# make sure the tables are empty
self.assertEqual(results, {
'result': -2,
'epidata': [],
'message': 'no results',
})

results = self._make_request(params = {'geo': 'state:pa'})

# make sure the data was actually served
self.assertEqual(results, {
'result': 1,
'epidata': [{'signal': 'sig', 'source': 'src'}],
'message': 'success',
})
33 changes: 33 additions & 0 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,36 @@ def retrieve_covidcast_meta_cache(self):
for entry in cache:
cache_hash[(entry['data_source'], entry['signal'], entry['time_type'], entry['geo_type'])] = entry
return cache_hash

def compute_coverage_crossref(self):
"""Compute coverage_crossref table, for looking up available signals per geo or vice versa."""

logger = get_structured_logger("compute_coverage_crossref")

coverage_crossref_delete_sql = '''
DELETE FROM coverage_crossref;
'''

coverage_crossref_update_sql = '''
INSERT INTO coverage_crossref (signal_key_id, geo_key_id, min_time_value, max_time_value)
SELECT
signal_key_id,
geo_key_id,
MIN(time_value) AS min_time_value,
MAX(time_value) AS max_time_value
FROM covid.epimetric_latest
GROUP BY signal_key_id, geo_key_id;
'''

self._connection.start_transaction()

self._cursor.execute(coverage_crossref_delete_sql)
logger.info("coverage_crossref_delete", rows=self._cursor.rowcount)

self._cursor.execute(coverage_crossref_update_sql)
logger.info("coverage_crossref_update", rows=self._cursor.rowcount)

self.commit()
logger.info("coverage_crossref committed")

return self._cursor.rowcount
21 changes: 21 additions & 0 deletions src/ddl/v4_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,24 @@ CREATE TABLE `covidcast_meta_cache` (
PRIMARY KEY (`timestamp`)
) ENGINE=InnoDB;
INSERT INTO covidcast_meta_cache VALUES (0, '[]');

CREATE TABLE `coverage_crossref` (
`signal_key_id` bigint NOT NULL,
`geo_key_id` bigint NOT NULL,
`min_time_value` int NOT NULL,
`max_time_value` int NOT NULL,
UNIQUE INDEX coverage_crossref_geo_sig (`geo_key_id`, `signal_key_id`),
INDEX coverage_crossref_sig_geo (`signal_key_id`, `geo_key_id`)
) ENGINE=InnoDB;

CREATE OR REPLACE VIEW `coverage_crossref_v` AS
SELECT
`sd`.`source`,
`sd`.`signal`,
`gd`.`geo_type`,
`gd`.`geo_value`,
`cc`.`min_time_value`,
`cc`.`max_time_value`
FROM `coverage_crossref` `cc`
JOIN `signal_dim` `sd` USING (`signal_key_id`)
JOIN `geo_dim` `gd` USING (`geo_key_id`);
1 change: 1 addition & 0 deletions src/ddl/v4_schema_aliases.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
CREATE VIEW `epidata`.`epimetric_full_v` AS SELECT * FROM `covid`.`epimetric_full_v`;
CREATE VIEW `epidata`.`epimetric_latest_v` AS SELECT * FROM `covid`.`epimetric_latest_v`;
CREATE VIEW `epidata`.`covidcast_meta_cache` AS SELECT * FROM `covid`.`covidcast_meta_cache`;
CREATE VIEW `epidata`.`coverage_crossref_v` AS SELECT * FROM `covid`.`coverage_crossref_v`;
44 changes: 44 additions & 0 deletions src/maintenance/coverage_crossref_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Updates the table for the `coverage_crossref` endpoint."""

# standard library
import argparse
import sys
import time

# first party
from delphi.epidata.acquisition.covidcast.database import Database
from delphi_utils import get_structured_logger
from delphi.epidata.client.delphi_epidata import Epidata


def main():
"""Updates the table for the `coverage_crossref`."""

logger = get_structured_logger("coverage_crossref_updater")
start_time = time.time()
database = Database()
database.connect()

# compute and update coverage_crossref
try:
coverage = database.compute_coverage_crossref()
except:
# clean up before failing
database.disconnect(True)
raise

result = ("success",1)
if coverage==0:
result = ("no results",-2)

logger.info('coverage_crossref result: %s (code %d)' % result)


logger.info(
"Generated and updated covidcast geo/signal coverage",
total_runtime_in_seconds=round(time.time() - start_time, 2))
return True


if __name__ == '__main__':
main()
18 changes: 18 additions & 0 deletions src/server/endpoints/covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,24 @@ def transform_row(row, proxy):

return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row)

@bp.route("/geo_coverage", methods=("GET", "POST"))
def handle_geo_coverage():
"""
For a specific geo returns the signal coverage (number of signals for a given geo_type)
"""

geo_sets = parse_geo_sets()

q = QueryBuilder("coverage_crossref_v", "c")
fields_string = ["source", "signal"]

q.set_fields(fields_string)

q.apply_geo_filters("geo_type", "geo_value", geo_sets)
q.set_sort_order("source", "signal")
q.group_by = ["c." + field for field in fields_string] # this condenses duplicate results, similar to `SELECT DISTINCT`

return execute_query(q.query, q.params, fields_string, [], [])

@bp.route("/anomalies", methods=("GET", "POST"))
def handle_anomalies():
Expand Down
16 changes: 16 additions & 0 deletions tests/acquisition/covidcast/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ def test_update_covidcast_meta_cache_query(self):
self.assertIn('timestamp', sql)
self.assertIn('epidata', sql)

def test_compute_coverage_crossref_query(self):
"""Query to update the compute crossref looks sensible.

NOTE: Actual behavior is tested by integration test.
"""

mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)

database.compute_coverage_crossref()

connection = mock_connector.connect()
cursor = connection.cursor()
self.assertTrue(cursor.execute.called)

def test_insert_or_update_batch_exception_reraised(self):
"""Test that an exception is reraised"""
mock_connector = MagicMock()
Expand Down
Loading