Skip to content

Commit d06a879

Browse files
ekzhanggongy
authored andcommitted
Port datasette to use volumes and make it much faster (#460)
* Fix datasette example and improve logging * Port to use volumes (much faster) * Remove tempfile impmort
1 parent 823adff commit d06a879

File tree

1 file changed

+75
-77
lines changed

1 file changed

+75
-77
lines changed

10_integrations/covid_datasette.py

Lines changed: 75 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
# Try it out for yourself at [modal-labs-example-covid-datasette-app.modal.run/covid-19](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
1212
#
1313
# Some Modal features it uses:
14-
# * Network file systems: a persisted volume lets us store and grow the published dataset over time
15-
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily
16-
# * Webhooks: exposes the Datasette application for web browser interaction and API requests.
14+
# * Volumes: a persisted volume lets us store and grow the published dataset over time.
15+
# * Scheduled functions: the underlying dataset is refreshed daily, so we schedule a function to run daily.
16+
# * Web endpoints: exposes the Datasette application for web browser interaction and API requests.
1717
#
1818
# ## Basic setup
1919
#
@@ -24,35 +24,29 @@
2424
import asyncio
2525
import pathlib
2626
import shutil
27-
import tempfile
28-
from datetime import datetime, timedelta
27+
import subprocess
28+
from datetime import datetime
29+
from urllib.request import urlretrieve
2930

30-
from modal import Image, NetworkFileSystem, Period, Stub, asgi_app
31+
from modal import Image, Period, Stub, Volume, asgi_app
3132

3233
stub = Stub("example-covid-datasette")
3334
datasette_image = (
3435
Image.debian_slim()
35-
.pip_install(
36-
"datasette~=0.63.2",
37-
"flufl.lock",
38-
"GitPython",
39-
"sqlite-utils",
40-
)
41-
.apt_install("git")
36+
.pip_install("datasette~=0.63.2", "sqlite-utils")
37+
.apt_install("unzip")
4238
)
4339

4440
# ## Persistent dataset storage
4541
#
4642
# To separate database creation and maintenance from serving, we'll need the underlying
47-
# database file to be stored persistently. To achieve this we use a [`NetworkFileSystem`](/docs/guide/shared-volumes),
48-
# a writable volume that can be attached to Modal functions and persisted across function runs.
43+
# database file to be stored persistently. To achieve this we use a [`Volume`](/docs/guide/volumes).
4944

50-
volume = NetworkFileSystem.persisted("covid-dataset-cache-vol")
45+
stub.volume = Volume.persisted("example-covid-datasette-cache-vol")
5146

52-
CACHE_DIR = "/cache"
53-
LOCK_FILE = str(pathlib.Path(CACHE_DIR, "lock-reports"))
54-
REPO_DIR = pathlib.Path(CACHE_DIR, "COVID-19")
55-
DB_PATH = pathlib.Path(CACHE_DIR, "covid-19.db")
47+
VOLUME_DIR = "/cache-vol"
48+
REPORTS_DIR = pathlib.Path(VOLUME_DIR, "COVID-19")
49+
DB_PATH = pathlib.Path(VOLUME_DIR, "covid-19.db")
5650

5751
# ## Getting a dataset
5852
#
@@ -65,42 +59,50 @@
6559

6660
@stub.function(
6761
image=datasette_image,
68-
network_file_systems={CACHE_DIR: volume},
62+
volumes={VOLUME_DIR: stub.volume},
6963
retries=2,
7064
)
7165
def download_dataset(cache=True):
72-
import git
73-
from flufl.lock import Lock
74-
75-
if REPO_DIR.exists() and cache:
66+
if REPORTS_DIR.exists() and cache:
7667
print(f"Dataset already present and {cache=}. Skipping download.")
7768
return
78-
elif REPO_DIR.exists():
79-
print(
80-
"Acquiring lock before deleting dataset, which may be in use by other runs."
81-
)
82-
with Lock(LOCK_FILE, default_timeout=timedelta(hours=1)):
83-
shutil.rmtree(REPO_DIR)
84-
print("Cleaned dataset before re-downloading.")
69+
elif REPORTS_DIR.exists():
70+
print("Cleaning dataset before re-downloading...")
71+
shutil.rmtree(REPORTS_DIR)
72+
73+
print("Downloading dataset...")
74+
urlretrieve(
75+
"https://github.com/CSSEGISandData/COVID-19/archive/refs/heads/master.zip",
76+
"/tmp/covid-19.zip",
77+
)
78+
79+
print("Unpacking archive...")
80+
prefix = "COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports"
81+
subprocess.run(
82+
f"unzip /tmp/covid-19.zip {prefix}/* -d {REPORTS_DIR}", shell=True
83+
)
84+
subprocess.run(f"mv {REPORTS_DIR / prefix}/* {REPORTS_DIR}", shell=True)
85+
86+
print("Committing the volume...")
87+
stub.volume.commit()
8588

86-
git_url = "https://github.com/CSSEGISandData/COVID-19"
87-
git.Repo.clone_from(git_url, REPO_DIR, depth=1)
89+
print("Finished downloading dataset.")
8890

8991

9092
# ## Data munging
9193
#
9294
# This dataset is no swamp, but a bit of data cleaning is still in order. The following two
93-
# functions are used to read a handful of `.csv` files from the git repository and cleaning the
94-
# rows data before inserting into SQLite. You can see that the daily reports are somewhat inconsistent
95-
# in their column names.
95+
# functions read a handful of `.csv` files and clean the data, before inserting it into
96+
# SQLite.
9697

9798

9899
def load_daily_reports():
99-
jhu_csse_base = REPO_DIR
100-
reports_path = (
101-
jhu_csse_base / "csse_covid_19_data" / "csse_covid_19_daily_reports"
102-
)
103-
daily_reports = list(reports_path.glob("*.csv"))
100+
stub.volume.reload()
101+
daily_reports = list(REPORTS_DIR.glob("*.csv"))
102+
if not daily_reports:
103+
raise RuntimeError(
104+
f"Could not find any daily reports in {REPORTS_DIR}."
105+
)
104106
for filepath in daily_reports:
105107
yield from load_report(filepath)
106108

@@ -141,12 +143,12 @@ def load_report(filepath):
141143
# ## Inserting into SQLite
142144
#
143145
# With the CSV processing out of the way, we're ready to create an SQLite DB and feed data into it.
144-
# Importantly, the `prep_db` function mounts the same network file system used by `download_dataset()`, and
146+
# Importantly, the `prep_db` function mounts the same volume used by `download_dataset()`, and
145147
# rows are batch inserted with progress logged after each batch, as the full COVID-19 has millions
146148
# of rows and does take some time to be fully inserted.
147149
#
148150
# A more sophisticated implementation would only load new data instead of performing a full refresh,
149-
# but for this example things are kept simple.
151+
# but we're keeping things simple for this example!
150152

151153

152154
def chunks(it, size):
@@ -157,44 +159,40 @@ def chunks(it, size):
157159

158160
@stub.function(
159161
image=datasette_image,
160-
network_file_systems={CACHE_DIR: volume},
162+
volumes={VOLUME_DIR: stub.volume},
161163
timeout=900,
162164
)
163165
def prep_db():
164166
import sqlite_utils
165-
from flufl.lock import Lock
166167

167168
print("Loading daily reports...")
168169
records = load_daily_reports()
169170

170-
with Lock(
171-
LOCK_FILE,
172-
lifetime=timedelta(minutes=2),
173-
default_timeout=timedelta(hours=1),
174-
) as lck, tempfile.NamedTemporaryFile() as tmp:
175-
db = sqlite_utils.Database(tmp.name)
176-
table = db["johns_hopkins_csse_daily_reports"]
171+
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
172+
db = sqlite_utils.Database(DB_PATH)
173+
table = db["johns_hopkins_csse_daily_reports"]
174+
175+
batch_size = 100_000
176+
for i, batch in enumerate(chunks(records, size=batch_size)):
177+
truncate = True if i == 0 else False
178+
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
179+
print(f"Inserted {len(batch)} rows into DB.")
177180

178-
batch_size = 100_000
179-
for i, batch in enumerate(chunks(records, size=batch_size)):
180-
truncate = True if i == 0 else False
181-
table.insert_all(batch, batch_size=batch_size, truncate=truncate)
182-
lck.refresh()
183-
print(f"Inserted {len(batch)} rows into DB.")
181+
table.create_index(["day"], if_not_exists=True)
182+
table.create_index(["province_or_state"], if_not_exists=True)
183+
table.create_index(["country_or_region"], if_not_exists=True)
184184

185-
table.create_index(["day"], if_not_exists=True)
186-
table.create_index(["province_or_state"], if_not_exists=True)
187-
table.create_index(["country_or_region"], if_not_exists=True)
185+
db.close()
188186

189-
print("Syncing DB with network volume.")
190-
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
191-
shutil.copyfile(tmp.name, DB_PATH)
187+
print("Syncing DB with volume.")
188+
stub.volume.commit()
192189

193190

194-
# ## Keeping fresh
191+
# ## Keep it fresh
195192
#
196-
# Johns Hopkins commits new data to the dataset repository every day, so we
197-
# setup a [scheduled](/docs/guide/cron) Modal function to run automatically once every 24 hours.
193+
# Johns Hopkins commits new data to the dataset repository every day, so we set up
194+
# a [scheduled](/docs/guide/cron) function to automatically refresh the database
195+
# every 24 hours.
198196

199197

200198
@stub.function(schedule=Period(hours=24), timeout=1000)
@@ -204,22 +202,22 @@ def refresh_db():
204202
prep_db.remote()
205203

206204

207-
# ## Webhook
205+
# ## Web endpoint
208206
#
209207
# Hooking up the SQLite database to a Modal webhook is as simple as it gets.
210-
# The Modal `@stub.asgi_app` decorator wraps a few lines of code: one `import` and a few
211-
# lines to instantiate the `Datasette` instance and return a reference to its ASGI app object.
208+
# The Modal `@asgi_app` decorator wraps a few lines of code: one `import` and a few
209+
# lines to instantiate the `Datasette` instance and return its app server.
212210

213211

214212
@stub.function(
215213
image=datasette_image,
216-
network_file_systems={CACHE_DIR: volume},
214+
volumes={VOLUME_DIR: stub.volume},
217215
)
218216
@asgi_app()
219217
def app():
220218
from datasette.app import Datasette
221219

222-
ds = Datasette(files=[DB_PATH])
220+
ds = Datasette(files=[DB_PATH], settings={"sql_time_limit_ms": 10000})
223221
asyncio.run(ds.invoke_startup())
224222
return ds.app()
225223

@@ -228,11 +226,11 @@ def app():
228226
#
229227
# Run this script using `modal run covid_datasette.py` and it will create the database.
230228
#
231-
# You can run this script using `modal serve covid_datasette.py` and it will create a
232-
# short-lived web URL that exists until you terminate the script.
229+
# You can then use `modal serve covid_datasette.py` to create a short-lived web URL
230+
# that exists until you terminate the script.
233231
#
234232
# When publishing the interactive Datasette app you'll want to create a persistent URL.
235-
# This is achieved by deploying the script with `modal deploy covid_datasette.py`.
233+
# Just run `modal deploy covid_datasette.py`.
236234

237235

238236
@stub.local_entrypoint()
@@ -243,4 +241,4 @@ def run():
243241
prep_db.remote()
244242

245243

246-
# You can go explore the data over at [modal-labs-covid-datasette-app.modal.run/covid-19/](https://modal-labs-example-covid-datasette-app.modal.run/covid-19/johns_hopkins_csse_daily_reports).
244+
# You can explore the data at the [deployed web endpoint](https://modal-labs--example-covid-datasette-app.modal.run/covid-19).

0 commit comments

Comments
 (0)