diff --git a/.azure/app-cloud-e2e.yml b/.azure/app-cloud-e2e.yml index bf76b53b53154..65dfdf7e327c7 100644 --- a/.azure/app-cloud-e2e.yml +++ b/.azure/app-cloud-e2e.yml @@ -117,6 +117,7 @@ jobs: workspace: clean: all variables: + FREEZE_REQUIREMENTS: "1" HEADLESS: '1' PACKAGE_LIGHTNING: '1' CLOUD: '1' @@ -146,8 +147,6 @@ jobs: - bash: | pip install -e .[test] \ -f https://download.pytorch.org/whl/cpu/torch_stable.html - env: - FREEZE_REQUIREMENTS: "1" displayName: 'Install Lightning & dependencies' - bash: | diff --git a/.azure/app-cloud-store.yml b/.azure/app-cloud-store.yml new file mode 100644 index 0000000000000..b0dc78ed2e695 --- /dev/null +++ b/.azure/app-cloud-store.yml @@ -0,0 +1,67 @@ +# Python package +# Create and test a Python package on multiple Python versions. +# Add steps that analyze code, save the dist with the build record, publish to a PyPI-compatible index, and more: +# https://docs.microsoft.com/azure/devops/pipelines/languages/python + +trigger: + tags: + include: + - '*' + branches: + include: + - "master" + - "release/*" + - "refs/tags/*" + +pr: + branches: + include: + - "master" + - "release/*" + paths: + include: + - ".actions/**" + - ".azure/app-cloud-store.yml" + - "src/lightning/store/**" + - "tests/tests_cloud/**" + - "setup.py" + exclude: + - "*.md" + - "**/*.md" + +jobs: + - job: test_store + pool: + vmImage: $(imageName) + strategy: + matrix: + Linux: + imageName: 'ubuntu-latest' + Mac: + imageName: 'macOS-latest' + Windows: + imageName: 'windows-latest' + timeoutInMinutes: "20" + cancelTimeoutInMinutes: "1" + workspace: + clean: all + variables: + FREEZE_REQUIREMENTS: "1" + TORCH_URL: "https://download.pytorch.org/whl/cpu/torch_stable.html" + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: '3.9' + + - bash: pip install -e .[test] -f $(TORCH_URL) + displayName: 'Install Lightning & dependencies' + + - bash: | + python -m pytest -m "not cloud" tests_cloud --timeout=300 -v + workingDirectory: tests/ + env: + API_KEY: $(LIGHTNING_API_KEY_PROD) + API_USERNAME: $(LIGHTNING_USERNAME_PROD) + PROJECT_ID: $(LIGHTNING_PROJECT_ID_PROD) + LIGHTNING_CLOUD_URL: $(LIGHTNING_CLOUD_URL_PROD) + displayName: 'Run the tests' diff --git a/.gitignore b/.gitignore index fbe0c29c29eca..7f4625487f8d9 100644 --- a/.gitignore +++ b/.gitignore @@ -48,8 +48,10 @@ wheels/ *.egg-info/ .installed.cfg *.egg -src/lightning/*/ src/*/version.info +src/lightning/app/ +src/lightning/fabric/ +src/lightning/pytorch/ # PyInstaller # Usually these files are written by a python script from a template diff --git a/src/lightning/store/README.md b/src/lightning/store/README.md new file mode 100644 index 0000000000000..85ea84335e648 --- /dev/null +++ b/src/lightning/store/README.md @@ -0,0 +1,99 @@ +## Getting Started + +- Login to lightning.ai (_optional_) \<-- takes less than a minute. ⏩ +- Store your models on the cloud \<-- simple call: `upload_to_cloud(...)`. 🗳️ +- Share it with your friends \<-- just share the "username/model_name" (and version if required) format. :handshake: +- They download using a simple call: `download_from_cloud("username/model_name", version="your_version")`. :wink: +- They load your cool model. `load_model("username/model_name", version="your_version")`. :tada: +- Lightning :zap: fast, isn't it?. :heart: + +## Usage + +**Storing to the cloud** + +```python +import lightning as L +from sample.model import LitAutoEncoder, Encoder, Decoder + +# Initialize your model here +autoencoder = LitAutoEncoder(Encoder(), Decoder()) + +# Pass the model object: +# No need to pass the username (we'll deduce ourselves), just pass the model name you want as the first argument (with an optional version): +# format: `model_name:version` (version can either be latest or combination of digits and full-stops: 1.0.0 for example) +L.store.upload_to_cloud("unique_model_mnist", model=autoencoder, source_code_path="sample") + +# version: +L.store.upload_to_cloud( + "unique_model_mnist", + version="1.0.0", + model=autoencoder, + source_code_path="sample/model.py", +) + +# OR: (this will save the file which has the model defined) +L.store.upload_to_cloud("krshrimali/unique_model_mnist", model=autoencoder) +``` + +You can also pass the checkpoint path: `to_lightning_cloud("model_name", version="latest", checkpoint_path=...)`. + +**Downloading from the cloud** + +At first, you need to download the model to your local machine. + +```python +import lightning as L + +L.store.download_from_cloud( + "krshrimali/unique_model_mnist", + output_dir="your_output_dir", +) +# OR: (default to model_storage +# $HOME +# |- .lightning +# | |- model_store +# | | |- username +# | | | |- +# | | | | |- version_ +# folder) +L.store.download_from_cloud("krshrimali/unique_model_mnist") +``` + +**Loading model** + +Then you can load the model to your program. + +```python +import lightning as L + +# from ..version_. import LitAutoEncoder, Encoder, Decoder +model = L.store.load_model("/>", version="version") # version is optional (defaults to latest) + +# OR: load weights or checkpoint (if they were uploaded) +L.store.load_model( + "/", version="version", load_weights=True | False, load_checkpoint=True | False +) +print(model) +``` + +**Loading model weights** + +```python +import lightning as L +from sample.model import LitAutoEncoder, Encoder, Decoder + +# If you had passed an `output_dir=...` to download_from_lightning_cloud(...), then you can just do: +# from output_dir. import LitAutoEncoder, Encoder, Decoder + +model = LitAutoEncoder(Encoder(), Decoder()) + +model = L.store.load_model(load_weights=True, model=model) +print("State dict: ", model.state_dict()) +``` + +Loading checkpoint is similar, just do: `load_checkpoint=True`. + +## Known limitations + +- missing web UI for user to brows his uploads +- missing CLI/API to list and delete uploaded models diff --git a/src/lightning/store/__init__.py b/src/lightning/store/__init__.py new file mode 100644 index 0000000000000..645891e717534 --- /dev/null +++ b/src/lightning/store/__init__.py @@ -0,0 +1,3 @@ +from lightning.store.cloud_api import download_from_cloud, load_model, upload_to_cloud + +__all__ = ["download_from_cloud", "load_model", "upload_to_cloud"] diff --git a/src/lightning/store/authentication.py b/src/lightning/store/authentication.py new file mode 100644 index 0000000000000..b5cef52894225 --- /dev/null +++ b/src/lightning/store/authentication.py @@ -0,0 +1,64 @@ +# Copyright The Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import webbrowser + +import requests +from requests.models import HTTPBasicAuth + +from lightning.app.core.constants import get_lightning_cloud_url +from lightning.app.utilities.network import LightningClient + +_LIGHTNING_CLOUD_URL = get_lightning_cloud_url() + + +def _get_user_details(): + client = LightningClient() + user_details = client.auth_service_get_user() + return user_details.username, user_details.api_key + + +def _get_username_from_api_key(api_key: str): + response = requests.get(url=f"{_LIGHTNING_CLOUD_URL}/v1/auth/user", auth=HTTPBasicAuth("lightning", api_key)) + if response.status_code != 200: + raise ConnectionRefusedError( + "API_KEY provided is either invalid or wasn't found in the database." + " Please ensure that you passed the correct API_KEY." + ) + return json.loads(response.content)["username"] + + +def _check_browser_runnable(): + try: + webbrowser.get() + except webbrowser.Error: + return False + return True + + +def _authenticate(inp_api_key: str = ""): + # TODO: we have headless login now, + # so it could be reasonable to just point to that if browser can't be opened / user can't be authed + if not inp_api_key: + if not _check_browser_runnable(): + raise ValueError( + "Couldn't find a runnable browser in the current system/server." + " In order to run the commands on this system, we suggest passing the `api_key`" + " after logging into https://lightning.ai." + ) + username, inp_api_key = _get_user_details() + else: + username = _get_username_from_api_key(inp_api_key) + return username, inp_api_key diff --git a/src/lightning/store/cloud_api.py b/src/lightning/store/cloud_api.py new file mode 100644 index 0000000000000..6b31a44d7ce1a --- /dev/null +++ b/src/lightning/store/cloud_api.py @@ -0,0 +1,404 @@ +# Copyright The Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import sys +import tempfile +from typing import Any, List, Optional, Union + +import requests +import torch + +import lightning as L +import pytorch_lightning as PL +from lightning.app.core.constants import LIGHTNING_MODELS_PUBLIC_REGISTRY +from lightning.store.authentication import _authenticate +from lightning.store.save import ( + _download_and_extract_data_to, + _get_linked_output_dir, + _LIGHTNING_STORAGE_DIR, + _LIGHTNING_STORAGE_FILE, + _save_checkpoint_from_path, + _save_meta_data, + _save_model, + _save_model_code, + _save_model_weights, + _save_requirements_file, + _submit_data_to_url, + _write_and_save_requirements, +) +from lightning.store.utils import _get_model_data, _split_name, stage + +logging.basicConfig(level=logging.INFO) + + +def upload_to_cloud( + name: str, + version: str = "latest", + model=None, + source_code_path: str = "", + checkpoint_path: str = "", + requirements_file_path: str = "", + requirements: Optional[List[str]] = None, + weights_only: bool = False, + api_key: str = "", + project_id: str = "", + progress_bar: bool = True, + save_code: bool = True, + *args: Any, + **kwargs: Any, +): + """Store model to lightning cloud. + + Args: + name: + The model name. Model/Checkpoint will be uploaded with this unique name. Format: "model_name" + version: + The version of the model to be uploaded. If not provided, default will be latest (not overridden). + model: + The model object (initialized). This is optional, but if `checkpoint_path` is not passed, + it will raise an error. (Optional) + source_code_path: + The path to the source code that needs to be uploaded along with the model. + The path can point to a python file or a directory. Path pointing to a non-python file + will raise an error. (Optional) + checkpoint_path: + The path to the checkpoint that needs to be uploaded. (Optional) + requirements_file_path: + The path to the requirements file, will always be uploaded with the name of `requirements.txt`. + requirements: + List of requirements as strings, that will be written as `requirements.txt` and + then uploaded. If both `requirements_file_path` and `requirements` are passed, + a warning is raised as `requirements` is given the priority over `requirements_file_path`. + weights_only: + If set to `True`, it will only save model weights and nothing else. This raises + an error if `weights_only` is `True` but no `model` is passed. + api_key: + API_KEY used for authentication. Fetch it after logging to https://lightning.ai + (in the keys tab in the settings). If not passed, the API will attempt to + either find the credentials in your system or opening the login prompt. + project_id: + Some users have multiple projects with unique `project_id`. They need to pass + this in order to upload models to the cloud. + progress_bar: + A progress bar to show the uploading status. Disable this if not needed, by setting to `False`. + save_code: + By default, the API saves the code where the model is defined. + Set it to `False` if saving code is not desired. + """ + if model is None and checkpoint_path is None: + raise ValueError( + """" + You either need to pass the model or the checkpoint path that you want to save. :) + Any one of: + `to_lightning_cloud("model_name", model=modelObj, ...)` + or + `to_lightning_cloud("model_name", checkpoint_path="your_checkpoint_path.ckpt", ...)` + is required. + """ + ) + + if weights_only and not model: + raise ValueError( + "No model passed to `to_lightning_cloud(...), in order to save weights," + " you need to pass the model object." + ) + + version = version or "latest" + _, model_name, _ = _split_name(name, version=version, l_stage=stage.UPLOAD) + username_from_api_key, api_key = _authenticate(api_key) + + # name = f"{username_from_api_key}/{model_name}:{version}" + + stored = {} + + with tempfile.TemporaryDirectory() as tmpdir: + if checkpoint_path: + stored = _save_checkpoint_from_path(model_name, path=checkpoint_path, tmpdir=tmpdir, stored=stored) + + if model: + stored = _save_model_weights( + model_name, model_state_dict=model.state_dict(), tmpdir=tmpdir, stored=stored, *args, **kwargs + ) + + if not weights_only: + stored = _save_model(model_name, model=model, tmpdir=tmpdir, stored=stored, *args, **kwargs) + + if save_code: + stored = _save_model_code( + model_name, + model_cls=model.__class__, + source_code_path=source_code_path, + tmpdir=tmpdir, + stored=stored, + ) + + if requirements and requirements_file_path: + # TODO: Later on, figure out how to merge requirements from both args + logging.warning( + "You provided a requirements file (..., requirements_file_path=...)" + " and requirements list (..., requirements=...). In case of any collisions," + " anything that comes from requirements=... will be given the priority." + ) + + if requirements: + stored = _write_and_save_requirements( + model_name, + requirements=requirements, + stored=stored, + tmpdir=tmpdir, + ) + elif requirements_file_path: + stored = _save_requirements_file( + model_name, + requirements_file_path=requirements_file_path, + stored=stored, + tmpdir=tmpdir, + ) + + url = _save_meta_data( + model_name, + stored=stored, + version=version, + model=model, + username=username_from_api_key, + api_key=api_key, + project_id=project_id, + ) + + _submit_data_to_url(url, tmpdir, progress_bar=progress_bar) + msg = "Finished storing the following data items to the Lightning Cloud.\n" + for key, val in stored.items(): + if key == "code": + msg += f"Stored code as a {val['type']} with name: {val['path']}\n" + else: + msg += f"Stored {key} with name: {val}\n" + + msg += """ + Just do: + `download_from_lightning_cloud("{username_from_api_key}/{model_name}", version="{version}")` + in order to download the model from the cloud to your local system. + """ + msg += """ + And: + `to_lightning_cloud("{username_from_api_key}/{model_name}", version="{version}")` + in order to load the downloaded model. + """ + logging.info(msg) + + +def _load_model(stored, output_dir, *args, **kwargs): + if "model" in stored: + sys.path.insert(0, f"{output_dir}") + model = torch.load(f"{output_dir}/{stored['model']}", *args, **kwargs) + return model + else: + raise ValueError( + "Couldn't find the model when uploaded to our storage." + " Please check with the model owner to confirm that the models exist in the storage." + ) + + +def _load_weights(model, stored, output_dir, *args, **kwargs): + if "weights" in stored: + model.load_state_dict(torch.load(f"{output_dir}/{stored['weights']}", *args, **kwargs)) + return model + else: + raise ValueError( + "Weights were not found, please contact the model's owner to verify if the weights were stored correctly." + ) + + +def _load_checkpoint(model, stored, output_dir, *args, **kwargs): + if "checkpoint" in stored: + ckpt = f"{output_dir}/{stored['checkpoint']}" + else: + raise ValueError( + "No checkpoint path was found, please contact the model owner to verify if the" + " checkpoint was saved successfully." + ) + + ckpt = model.load_from_checkpoint(ckpt, *args, **kwargs) + return ckpt + + +def download_from_cloud( + name: str, + version: str = "latest", + output_dir: str = "", + progress_bar: bool = True, +): + """Download model from lightning cloud. + + Args: + name: + The unique name of the model to be downloaded. Format: `/`. + version: + The version of the model to be uploaded. If not provided, default will be latest (not overridden). + output_dir: + The target directory, where the model and other data will be stored. If not passed, + the data will be stored in `$HOME/.lightning/model_store///`. + (`version` defaults to `latest`) + progress_bar: + Show progress on download. + """ + version = version or "latest" + username, model_name, version = _split_name(name, version=version, l_stage=stage.DOWNLOAD) + + linked_output_dir = "" + if not output_dir: + output_dir = _LIGHTNING_STORAGE_DIR + output_dir = os.path.join(output_dir, username, model_name, version) + linked_output_dir = _get_linked_output_dir(output_dir) + else: + output_dir = os.path.abspath(output_dir) + + if not os.path.isdir(output_dir): + os.makedirs(output_dir) + + response = requests.get(f"{LIGHTNING_MODELS_PUBLIC_REGISTRY}?name={username}/{model_name}&version={version}") + if response.status_code != 200: + raise ConnectionRefusedError( + f"Unable to download the model with name {name} and version {version}." + " Maybe reach out to the model owner or check the arguments again?" + ) + + download_url_response = json.loads(response.content) + download_url = download_url_response["downloadUrl"] + meta_data = download_url_response["metadata"] + + logging.info(f"Downloading the model data for {name} to {output_dir} folder.") + _download_and_extract_data_to(output_dir, download_url, progress_bar) + + if linked_output_dir: + logging.info(f"Linking the downloaded folder from {output_dir} to {linked_output_dir} folder.") + if os.path.islink(linked_output_dir): + os.unlink(linked_output_dir) + if os.path.exists(linked_output_dir) and os.path.isdir(linked_output_dir): + os.rmdir(linked_output_dir) + + os.symlink(output_dir, linked_output_dir) + + with open(_LIGHTNING_STORAGE_FILE, "w+") as storage_file: + storage = { + username: { + model_name: { + version: { + "output_dir": output_dir, + "linked_output_dir": str(linked_output_dir), + "metadata": meta_data, + }, + }, + }, + } + json.dump(storage, storage_file) + + logging.info("Downloading done...") + logging.info( + f"The source code for your model has been written to {output_dir} folder," + f" and linked to {linked_output_dir} folder." + ) + logging.info( + "Please make sure to add imports to the necessary classes needed for instantiation of" + " your model before calling `load_from_lightning_cloud`." + ) + + +def _validate_output_dir(folder: str) -> None: + if not os.path.exists(folder): + raise ValueError( + "The output directory doesn't exist... did you forget to call download_from_lightning_cloud(...)?" + ) + + +def load_model( + name: str, + version: str = "latest", + load_weights: bool = False, + load_checkpoint: bool = False, + model: Union[PL.LightningModule, L.LightningModule, None] = None, + *args, + **kwargs, +): + """Load model from lightning cloud. + + Args: + name: + Name of the model to load. Format: `/` + version: + The version of the model to be uploaded. If not provided, default will be latest (not overridden). + load_weights: + Loads only weights if this is set to `True`. Needs `model` to be passed in order to load the weights. + load_checkpoint: + Loads checkpoint if this is set to `True`. Only a `LightningModule` model is supported for this feature. + model: + Model class to be used. + """ + if load_weights and load_checkpoint: + raise ValueError( + f"You passed load_weights={load_weights} and load_checkpoint={load_checkpoint}," + " it's expected that only one of them are requested in a single call." + ) + + if os.path.exists(_LIGHTNING_STORAGE_FILE): + version = version or "latest" + model_data = _get_model_data(name, version) + output_dir = model_data["output_dir"] + linked_output_dir = model_data["linked_output_dir"] + meta_data = model_data["metadata"] + stored = {"code": {}} + + for key, val in meta_data.items(): + if key.startswith("stored_"): + if key.startswith("stored_code_"): + stored["code"][key.split("_code_")[1]] = val + else: + stored[key.split("_")[1]] = val + + _validate_output_dir(output_dir) + if linked_output_dir: + _validate_output_dir(linked_output_dir) + + if load_weights: + # This first loads the model - and then the weights + if not model: + raise ValueError( + "Expected model=... to be passed for loading weights, please pass" + f" your model object to load_from_lightning_cloud({name}, {version}, model=ModelObj)" + ) + return _load_weights(model, stored, linked_output_dir or output_dir, *args, **kwargs) + elif load_checkpoint: + if not model: + raise ValueError( + "You need to pass the LightningModule object (model) to be able to" + f" load the checkpoint. `load_from_lightning_cloud({name}, {version}," + " load_checkpoint=True, model=...)`" + ) + if not isinstance(model, (PL.LightningModule, L.LightningModule)): + raise TypeError( + "For loading checkpoints, the model is required to be a LightningModule" + f" or a subclass of LightningModule, got type {type(model)}." + ) + + return _load_checkpoint(model, stored, linked_output_dir or output_dir, *args, **kwargs) + else: + return _load_model(stored, linked_output_dir or output_dir, *args, **kwargs) + else: + raise ValueError( + f"Could not find the model (for {name}:{version}) in the local system." + " Did you make sure to download the model using: `download_from_lightning_cloud(...)`" + " before calling `load_from_lightning_cloud(...)`?" + ) diff --git a/src/lightning/store/save.py b/src/lightning/store/save.py new file mode 100644 index 0000000000000..e179cfc05ca04 --- /dev/null +++ b/src/lightning/store/save.py @@ -0,0 +1,301 @@ +# Copyright The Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import json +import logging +import os +import shutil +import tarfile +from pathlib import PurePath + +import requests +import torch +from requests.auth import HTTPBasicAuth +from tqdm import tqdm +from tqdm.utils import CallbackIOWrapper + +from lightning.app.core.constants import LIGHTNING_MODELS_PUBLIC_REGISTRY + +logging.basicConfig(level=logging.INFO) + +_LIGHTNING_DIR = os.path.join(os.path.expanduser("~"), ".lightning") +_LIGHTNING_STORAGE_FILE = os.path.join(_LIGHTNING_DIR, ".model_storage") +_LIGHTNING_STORAGE_DIR = os.path.join(_LIGHTNING_DIR, "model_store") + + +def _check_id(id: str) -> str: + if id[-1] != "/": + id += "/" + if id.count("/") != 2: + raise ValueError("The format for the ID should be: /") + return id + + +def _save_checkpoint(name, checkpoint, tmpdir, stored: dict) -> dict: + checkpoint_file_path = f"{tmpdir}/checkpoint.ckpt" + torch.save(checkpoint, checkpoint_file_path) + stored["checkpoint"] = "checkpoint.ckpt" + return stored + + +def _save_checkpoint_from_path(name, path, tmpdir, stored: dict) -> dict: + checkpoint_file_path = path + shutil.copy(checkpoint_file_path, f"{tmpdir}/checkpoint.ckpt") + stored["checkpoint"] = "checkpoint.ckpt" + return stored + + +def _save_model_weights(name, model_state_dict, tmpdir, stored, *args, **kwargs) -> dict: + # For now we assume that it's always going to be public + weights_file_path = f"{tmpdir}/weights.pt" + torch.save(model_state_dict, weights_file_path, *args, **kwargs) + stored["weights"] = "weights.pt" + return stored + + +def _save_model(name, model, tmpdir, stored, *args, **kwargs) -> dict: + # For now we assume that it's always going to be public + model_file_path = f"{tmpdir}/model" + torch.save(model, model_file_path, *args, **kwargs) + stored["model"] = "model" + return stored + + +def _save_model_code(name, model_cls, source_code_path, tmpdir, stored) -> dict: + if source_code_path: + source_code_path = os.path.abspath(source_code_path) + if not os.path.exists(source_code_path): + raise FileExistsError(f"Given path {source_code_path} does not exist.") + + # Copy contents to tmpdir folder + if os.path.isdir(source_code_path): + logging.warning( + f"NOTE: Folder: {source_code_path} is being uploaded to the cloud so that the user " + " can make the necessary imports after downloading your model." + ) + dir_name = os.path.basename(source_code_path) + shutil.copytree(source_code_path, f"{tmpdir}/{dir_name}/") + stored["code"] = {"type": "folder", "path": f"{dir_name}"} + else: + if os.path.splitext(source_code_path)[-1] != ".py": + raise FileExistsError( + "Expected a Python file or a directory, to be uploaded for model definition," + f" but found {source_code_path}. If your file is not a Python file, and you still" + " want to save it, please consider saving it in a folder and passing the folder" + " path instead." + ) + + logging.warning( + f"NOTE: File: {source_code_path} is being uploaded to the cloud so that the" + " user can make the necessary imports after downloading your model." + ) + + file_name = os.path.basename(source_code_path) + shutil.copy(source_code_path, f"{tmpdir}/{file_name}") + stored["code"] = {"type": "file", "path": f"{file_name}"} + else: + # TODO: Raise a warning if the file has any statements/expressions outside of + # __name__ == "__main__" in the script + # As those will be executed on import + model_class_path = inspect.getsourcefile(model_cls) + if model_class_path: + if os.path.splitext(model_class_path)[-1] != ".py": + raise FileExistsError( + f"The model definition was found in a non-python file ({model_class_path})," + " which is not currently supported (for safety reasons). If your file is not a" + " Python file, and you still want to save it, please consider saving it in a" + " folder and passing the folder path instead." + ) + + file_name = os.path.basename(model_class_path) + logging.warning( + f"NOTE: File: {model_class_path} is being uploaded to the cloud so that the" + " user can make the necessary imports after downloading your model. The file" + f" will be saved as {file_name} on download." + ) + shutil.copyfile(model_class_path, f"{tmpdir}/{file_name}") + stored["code"] = {"type": "file", "path": file_name} + return stored + + +def _write_and_save_requirements(name, requirements, stored, tmpdir): + if not isinstance(requirements, list) and isinstance(requirements, str): + requirements = [requirements] + + requirements_file_path = f"{tmpdir}/requirements.txt" + + with open(requirements_file_path, "w+") as req_file: + for req in requirements: + req_file.write(req + "\n") + + stored["requirements"] = "requirements.txt" + return stored + + +def _save_requirements_file(name, requirements_file_path, stored, tmpdir) -> dict: + shutil.copyfile(os.path.abspath(requirements_file_path), f"{tmpdir}/requirements.txt") + stored["requirements"] = requirements_file_path + return stored + + +def _upload_metadata( + meta_data: dict, + name: str, + version: str, + username: str, + api_key: str, + project_id: str, +): + def _get_url(response_content): + content = json.loads(response_content) + return content["uploadUrl"] + + json_field = { + "name": f"{username}/{name}", + "version": version, + "metadata": meta_data, + } + if project_id: + json_field["project_id"] = project_id + response = requests.post( + LIGHTNING_MODELS_PUBLIC_REGISTRY, + auth=HTTPBasicAuth(username, api_key), + json=json_field, + ) + if response.status_code != 200: + raise ConnectionRefusedError(f"Unable to upload content.\n Error: {response.content}\n for load: {json_field}") + return _get_url(response.content) + + +def _save_meta_data(name, stored, version, model, username, api_key, project_id): + def _process_stored(stored: dict): + processed_dict = {} + for key, val in stored.items(): + if "code" in key: + for code_key, code_val in stored[key].items(): + processed_dict[f"stored_code_{code_key}"] = code_val + else: + processed_dict[f"stored_{key}"] = val + return processed_dict + + meta_data = {"cls": model.__class__.__name__} + meta_data.update(_process_stored(stored)) + + return _upload_metadata( + meta_data, + name=name, + version=version, + username=username, + api_key=api_key, + project_id=project_id, + ) + + +def _submit_data_to_url(url: str, tmpdir: str, progress_bar: bool) -> None: + def _make_tar(tmpdir, archive_output_path): + with tarfile.open(archive_output_path, "w:gz") as tar: + tar.add(tmpdir) + + def upload_from_file(src, dst): + file_size = os.path.getsize(src) + with open(src, "rb") as fd: + with tqdm( + desc="Uploading", + total=file_size, + unit="B", + unit_scale=True, + unit_divisor=1024, + ) as t: + reader_wrapper = CallbackIOWrapper(t.update, fd, "read") + response = requests.put(dst, data=reader_wrapper) + response.raise_for_status() + + archive_path = f"{tmpdir}/data.tar.gz" + _make_tar(tmpdir, archive_path) + if progress_bar: + upload_from_file(archive_path, url) + else: + requests.put(url, data=open(archive_path, "rb")) + + +def _download_tarfile(download_url: str, output_dir: str, progress_bar: bool) -> None: + with requests.get(download_url, stream=True) as req_stream: + total_size_in_bytes = int(req_stream.headers.get("content-length", 0)) + block_size = 1024 # 1 Kibibyte + + download_progress_bar = None + if progress_bar: + download_progress_bar = tqdm(total=total_size_in_bytes, unit="iB", unit_scale=True) + with open(f"{output_dir}/data.tar.gz", "wb") as f: + for chunk in req_stream.iter_content(chunk_size=block_size): + if download_progress_bar: + download_progress_bar.update(len(chunk)) + f.write(chunk) + if download_progress_bar: + download_progress_bar.close() + + +def _common_clean_up(output_dir: str) -> None: + data_file_path = f"{output_dir}/data.tar.gz" + dir_file_path = f"{output_dir}/extracted" + if os.path.exists(data_file_path): + os.remove(data_file_path) + shutil.rmtree(dir_file_path) + + +def _download_and_extract_data_to(output_dir: str, download_url: str, progress_bar: bool) -> None: + try: + _download_tarfile(download_url, output_dir, progress_bar) + + tar = tarfile.open(f"{output_dir}/data.tar.gz", "r:gz") + tmpdir_name = tar.getnames()[0] + tar.extractall(path=f"{output_dir}/extracted") + tar.close() + + root = f"{output_dir}" + for filename in os.listdir(os.path.join(root, "extracted", tmpdir_name)): + abs_file_name = os.path.join(root, "extracted", tmpdir_name, filename) + func = shutil.copytree if os.path.isdir(abs_file_name) else shutil.copy + + dst_file_name = os.path.join(root, filename) + if os.path.exists(dst_file_name): + if os.path.isdir(dst_file_name): + shutil.rmtree(dst_file_name) + else: + os.remove(dst_file_name) + + func(abs_file_name, os.path.join(root, filename)) + + if not os.path.isdir(f"{output_dir}"): + raise NotADirectoryError( + f"Data downloading to the output directory: {output_dir} failed." + f" Maybe try again or contact the model owner?" + ) + except Exception as ex: + _common_clean_up(output_dir) + raise ex + else: + _common_clean_up(output_dir) + + +def _get_linked_output_dir(src_dir: str): + # The last sub-folder will be our version + version_folder_name = PurePath(src_dir).parts[-1] + + if version_folder_name == "latest": + return str(PurePath(src_dir).parent.joinpath("version_latest")) + else: + replaced_ver = version_folder_name.replace(".", "_") + return str(PurePath(src_dir).parent.joinpath(f"version_{replaced_ver}")) diff --git a/src/lightning/store/utils.py b/src/lightning/store/utils.py new file mode 100644 index 0000000000000..df7c877ef8914 --- /dev/null +++ b/src/lightning/store/utils.py @@ -0,0 +1,79 @@ +# Copyright The Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from enum import Enum +from typing import Tuple + +from lightning.store.save import _LIGHTNING_STORAGE_FILE + + +class stage(Enum): + UPLOAD = 0 + LOAD = 1 + DOWNLOAD = 2 + + +def _check_version(version: str) -> bool: + allowed_chars = "0123456789." + if version == "latest": + return True + for version_char in version: + if version_char not in allowed_chars: + return False + return True + + +def _split_name(name: str, version: str, l_stage: stage) -> Tuple[str, str, str]: + if l_stage == stage.UPLOAD: + username = "" + model_name = name + else: + username, model_name = name.split("/") + + return username, model_name, version + + +def _get_model_data(name: str, version: str): + username, model_name, version = _split_name(name, version, stage.LOAD) + + if not os.path.exists(_LIGHTNING_STORAGE_FILE): + raise NotADirectoryError( + f"Could not find {_LIGHTNING_STORAGE_FILE} (to be generated after download_from_lightning_cloud(...))" + ) + + with open(_LIGHTNING_STORAGE_FILE) as storage_file: + storage_data = json.load(storage_file) + + if username not in storage_data: + raise KeyError( + f"No data found for the given username {username}. Make sure to call" + " `download_from_lightning_cloud` before loading" + ) + user_data = storage_data[username] + + if model_name not in user_data: + raise KeyError( + f"No data found for the given model name: {model_name} for the given" + f" username: {username}. Make sure to call `download_from_lightning_cloud` before loading" + ) + model_data = user_data[model_name] + + version = version or "latest" + if version not in model_data: + raise KeyError(f"No data found for the given version: {version}, did you download the model successfully?") + model_version_data = model_data[version] + + return model_version_data diff --git a/src/lightning_app/core/constants.py b/src/lightning_app/core/constants.py index 052d489e83a7b..35b0b44a073c3 100644 --- a/src/lightning_app/core/constants.py +++ b/src/lightning_app/core/constants.py @@ -65,6 +65,7 @@ def get_lightning_cloud_url() -> str: DOT_IGNORE_FILENAME = ".lightningignore" LIGHTNING_COMPONENT_PUBLIC_REGISTRY = "https://lightning.ai/v1/components" LIGHTNING_APPS_PUBLIC_REGISTRY = "https://lightning.ai/v1/apps" +LIGHTNING_MODELS_PUBLIC_REGISTRY = "https://lightning.ai/v1/models" # EXPERIMENTAL: ENV VARIABLES TO ENABLE MULTIPLE WORKS IN THE SAME MACHINE DEFAULT_NUMBER_OF_EXPOSED_PORTS = int(os.getenv("DEFAULT_NUMBER_OF_EXPOSED_PORTS", "50")) diff --git a/tests/tests_cloud/__init__.py b/tests/tests_cloud/__init__.py new file mode 100644 index 0000000000000..70245585b8152 --- /dev/null +++ b/tests/tests_cloud/__init__.py @@ -0,0 +1,13 @@ +import os + +_TEST_ROOT = os.path.dirname(__file__) +_PROJECT_ROOT = os.path.dirname(os.path.dirname(_TEST_ROOT)) +_LIGHTNING_DIR = f"{os.path.expanduser('~')}/.lightning" + + +_USERNAME = os.getenv("API_USERNAME", "") +assert _USERNAME, "No API_USERNAME env variable, make sure to add it before testing" +_API_KEY = os.getenv("API_KEY", "") +assert _API_KEY, "No API_KEY env variable, make sure to add it before testing" +_PROJECT_ID = os.getenv("PROJECT_ID", "") +assert _PROJECT_ID, "No PROJECT_ID env variable, make sure to add it before testing" diff --git a/tests/tests_cloud/helpers.py b/tests/tests_cloud/helpers.py new file mode 100644 index 0000000000000..282cb095dfb77 --- /dev/null +++ b/tests/tests_cloud/helpers.py @@ -0,0 +1,12 @@ +import os +import shutil + +from lightning.store.save import _LIGHTNING_STORAGE_DIR + + +# TODO: make this as a fixture +def cleanup(): + # todo: `LIGHTNING_MODEL_STORE_TESTING` is nor working as intended, + # so the fixture shall create temp folder and map it home... + if os.getenv("LIGHTNING_MODEL_STORE_TESTING") and os.path.isdir(_LIGHTNING_STORAGE_DIR): + shutil.rmtree(_LIGHTNING_STORAGE_DIR) diff --git a/tests/tests_cloud/test_model.py b/tests/tests_cloud/test_model.py new file mode 100644 index 0000000000000..6eae880b3758e --- /dev/null +++ b/tests/tests_cloud/test_model.py @@ -0,0 +1,65 @@ +import os + +from tests_cloud import _API_KEY, _PROJECT_ID, _USERNAME +from tests_cloud.helpers import cleanup + +import pytorch_lightning as pl +from lightning.store import download_from_cloud, load_model, upload_to_cloud +from lightning.store.save import _LIGHTNING_STORAGE_DIR +from pytorch_lightning.demos.boring_classes import BoringModel + + +def test_model(model_name: str = "boring_model", version: str = "latest"): + cleanup() + + upload_to_cloud(model_name, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID) + + download_from_cloud(f"{_USERNAME}/{model_name}") + assert os.path.isdir(os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version)) + + model = load_model(f"{_USERNAME}/{model_name}") + assert model is not None + + +def test_model_without_progress_bar(model_name: str = "boring_model", version: str = "latest"): + cleanup() + + upload_to_cloud(model_name, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID, progress_bar=False) + + download_from_cloud(f"{_USERNAME}/{model_name}", progress_bar=False) + assert os.path.isdir(os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version)) + + model = load_model(f"{_USERNAME}/{model_name}") + assert model is not None + + +def test_only_weights(model_name: str = "boring_model_only_weights", version: str = "latest"): + cleanup() + + model = BoringModel() + trainer = pl.Trainer(fast_dev_run=True) + trainer.fit(model) + upload_to_cloud(model_name, model=model, weights_only=True, api_key=_API_KEY, project_id=_PROJECT_ID) + + download_from_cloud(f"{_USERNAME}/{model_name}") + assert os.path.isdir(os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version)) + + model_with_weights = load_model(f"{_USERNAME}/{model_name}", load_weights=True, model=model) + assert model_with_weights is not None + assert model_with_weights.state_dict() is not None + + +def test_checkpoint_path(model_name: str = "boring_model_only_checkpoint_path", version: str = "latest"): + cleanup() + + model = BoringModel() + trainer = pl.Trainer(fast_dev_run=True) + trainer.fit(model) + trainer.save_checkpoint("tmp.ckpt") + upload_to_cloud(model_name, checkpoint_path="tmp.ckpt", api_key=_API_KEY, project_id=_PROJECT_ID) + + download_from_cloud(f"{_USERNAME}/{model_name}") + assert os.path.isdir(os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version)) + + ckpt = load_model(f"{_USERNAME}/{model_name}", load_checkpoint=True, model=model) + assert ckpt is not None diff --git a/tests/tests_cloud/test_requirements.py b/tests/tests_cloud/test_requirements.py new file mode 100644 index 0000000000000..fc8d2685ed1c6 --- /dev/null +++ b/tests/tests_cloud/test_requirements.py @@ -0,0 +1,56 @@ +import os + +from tests_cloud import _API_KEY, _PROJECT_ID, _PROJECT_ROOT, _USERNAME +from tests_cloud.helpers import cleanup + +from lightning.store import download_from_cloud, upload_to_cloud +from lightning.store.save import _LIGHTNING_STORAGE_DIR +from pytorch_lightning.demos.boring_classes import BoringModel + + +def test_requirements_as_a_file(version: str = "latest", model_name: str = "boring_model"): + cleanup() + + requirements_file_path = os.path.join(_PROJECT_ROOT, "requirements", "app", "base.txt") + + upload_to_cloud( + model_name, + version=version, + model=BoringModel(), + requirements_file_path=requirements_file_path, + api_key=_API_KEY, + project_id=_PROJECT_ID, + ) + + download_from_cloud(f"{_USERNAME}/{model_name}") + + req_folder_path = os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version) + assert os.path.isdir(req_folder_path), "missing: %s" % req_folder_path + assert "requirements.txt" in os.listdir(req_folder_path), "among files: %r" % os.listdir(req_folder_path) + + +def test_requirements_as_a_list(version: str = "1.0.0", model_name: str = "boring_model"): + cleanup() + + requirements_list = ["pytorch_lightning==1.7.7", "lightning"] + + upload_to_cloud( + model_name, + version=version, + model=BoringModel(), + requirements=requirements_list, + api_key=_API_KEY, + project_id=_PROJECT_ID, + ) + + download_from_cloud(f"{_USERNAME}/{model_name}", version=version) + + req_folder_path = os.path.join(_LIGHTNING_STORAGE_DIR, _USERNAME, model_name, version) + assert os.path.isdir(req_folder_path), "missing: %s" % req_folder_path + assert "requirements.txt" in os.listdir(req_folder_path), "among files: %r" % os.listdir(req_folder_path) + + with open(f"{req_folder_path}/requirements.txt") as req_file: + reqs = req_file.readlines() + reqs = [req.strip("\n") for req in reqs] + + assert requirements_list == reqs diff --git a/tests/tests_cloud/test_source_code.py b/tests/tests_cloud/test_source_code.py new file mode 100644 index 0000000000000..11ea89775e484 --- /dev/null +++ b/tests/tests_cloud/test_source_code.py @@ -0,0 +1,100 @@ +import inspect +import os +import tempfile + +from tests_cloud import _API_KEY, _PROJECT_ID, _PROJECT_ROOT, _TEST_ROOT, _USERNAME +from tests_cloud.helpers import cleanup + +from lightning.store import download_from_cloud, upload_to_cloud +from lightning.store.save import _LIGHTNING_STORAGE_DIR +from pytorch_lightning.demos.boring_classes import BoringModel + + +def test_source_code_implicit(model_name: str = "model_test_source_code_implicit"): + cleanup() + + upload_to_cloud(model_name, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID) + + download_from_cloud(f"{_USERNAME}/{model_name}") + assert os.path.isfile( + os.path.join( + _LIGHTNING_STORAGE_DIR, + _USERNAME, + model_name, + "latest", + str(os.path.basename(inspect.getsourcefile(BoringModel))), + ) + ) + + +def test_source_code_saving_disabled(model_name: str = "model_test_source_code_dont_save"): + cleanup() + + upload_to_cloud(model_name, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID, save_code=False) + + download_from_cloud(f"{_USERNAME}/{model_name}") + assert not os.path.isfile( + os.path.join( + _LIGHTNING_STORAGE_DIR, + _USERNAME, + model_name, + "latest", + str(os.path.basename(inspect.getsourcefile(BoringModel))), + ) + ) + + +def test_source_code_explicit_relative_folder(model_name: str = "model_test_source_code_explicit_relative"): + cleanup() + + dir_upload_path = _TEST_ROOT + upload_to_cloud( + model_name, model=BoringModel(), source_code_path=dir_upload_path, api_key=_API_KEY, project_id=_PROJECT_ID + ) + + download_from_cloud(f"{_USERNAME}/{model_name}") + + assert os.path.isdir( + os.path.join( + _LIGHTNING_STORAGE_DIR, _USERNAME, model_name, "latest", os.path.basename(os.path.abspath(dir_upload_path)) + ) + ) + + +def test_source_code_explicit_absolute_folder(model_name: str = "model_test_source_code_explicit_absolute_path"): + cleanup() + + with tempfile.TemporaryDirectory() as tmpdir: + dir_upload_path = os.path.abspath(tmpdir) + upload_to_cloud( + model_name, model=BoringModel(), source_code_path=dir_upload_path, api_key=_API_KEY, project_id=_PROJECT_ID + ) + + download_from_cloud(f"{_USERNAME}/{model_name}") + + assert os.path.isdir( + os.path.join( + _LIGHTNING_STORAGE_DIR, _USERNAME, model_name, "latest", os.path.basename(os.path.abspath(dir_upload_path)) + ) + ) + + +def test_source_code_explicit_file(model_name: str = "model_test_source_code_explicit_file"): + cleanup() + + file_name = os.path.join(_PROJECT_ROOT, "setup.py") + upload_to_cloud( + model_name, model=BoringModel(), source_code_path=file_name, api_key=_API_KEY, project_id=_PROJECT_ID + ) + + download_from_cloud(f"{_USERNAME}/{model_name}") + + assert os.path.isfile( + os.path.join( + _LIGHTNING_STORAGE_DIR, + _USERNAME, + model_name, + "latest", + os.path.basename(file_name), + ) + ) diff --git a/tests/tests_cloud/test_versioning.py b/tests/tests_cloud/test_versioning.py new file mode 100644 index 0000000000000..373d0a7500504 --- /dev/null +++ b/tests/tests_cloud/test_versioning.py @@ -0,0 +1,61 @@ +import os +import platform + +import pytest +from tests_cloud import _API_KEY, _PROJECT_ID, _USERNAME +from tests_cloud.helpers import cleanup + +from lightning.store.cloud_api import download_from_cloud, upload_to_cloud +from lightning.store.save import _LIGHTNING_STORAGE_DIR +from pytorch_lightning.demos.boring_classes import BoringModel + + +def assert_download_successful(username, model_name, version): + folder_name = os.path.join(_LIGHTNING_STORAGE_DIR, username, model_name, version) + assert os.path.isdir(folder_name), f"Folder name: {folder_name} doesn't exist." + assert len(os.listdir(folder_name)) != 0 + + +@pytest.mark.parametrize( + ("case", "expected_case"), + ( + [ + ("1.0.0", "version_1_0_0"), + ("0.0.1", "version_0_0_1"), + ("latest", "version_latest"), + ("1.0", "version_1_0"), + ("1", "version_1"), + ("0.1", "version_0_1"), + ("", "version_latest"), + ] + ), +) +def test_versioning_valid_case(case, expected_case, model_name: str = "boring_model_versioning"): + cleanup() + + upload_to_cloud(model_name, version=case, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID) + download_from_cloud(f"{_USERNAME}/{model_name}", version=case) + assert_download_successful(_USERNAME, model_name, expected_case) + + +@pytest.mark.parametrize( + "case", + ( + [ + " version with spaces ", + "*", + # "#", <-- TODO: Add it back later + "¡", + "©", + ] + ), +) +def test_versioning_invalid_case(case, model_name: str = "boring_model_versioning"): + cleanup() + + with pytest.raises(ConnectionRefusedError): + upload_to_cloud(model_name, version=case, model=BoringModel(), api_key=_API_KEY, project_id=_PROJECT_ID) + + error = OSError if case == "*" and platform.system() == "Windows" else ConnectionRefusedError + with pytest.raises(error): + download_from_cloud(f"{_USERNAME}/{model_name}", version=case)