-
Notifications
You must be signed in to change notification settings - Fork 182
upload file to sandbox #355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
7dcbf2f
1e2502a
e12bd29
1cef23c
5bd3591
c8a9472
5fd25f6
a4d3d36
6efadd4
3e82be7
393a926
9602c6c
985cb26
9a4c0a3
2efc727
249edf5
22cd958
30e408b
94b338a
6bb7a30
bbf321f
852e6ec
5ae6b57
c64e2ba
36cdb1e
e0921fe
a3c1c55
bae12e6
7d9dee2
3b91e7b
954113e
624aea7
788fab0
0f56092
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,11 +1,14 @@ | ||||||||||||||||||||||||||||
import base64 | ||||||||||||||||||||||||||||
import mimetypes | ||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||
import re | ||||||||||||||||||||||||||||
import uuid | ||||||||||||||||||||||||||||
from io import BytesIO | ||||||||||||||||||||||||||||
from pathlib import Path | ||||||||||||||||||||||||||||
from typing import List, Optional, Tuple | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
from app.engine.index import IndexConfig, get_index | ||||||||||||||||||||||||||||
from app.engine.utils.file_helper import FileMetadata, save_file | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about moving save_file to file.py (this service) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also think the file helper and file service are a bit duplicated, but the idea is to separate them out to be reused in both the engine code and API code. |
||||||||||||||||||||||||||||
from llama_index.core import VectorStoreIndex | ||||||||||||||||||||||||||||
from llama_index.core.ingestion import IngestionPipeline | ||||||||||||||||||||||||||||
from llama_index.core.readers.file.base import ( | ||||||||||||||||||||||||||||
|
@@ -31,94 +34,141 @@ def get_llamaparse_parser(): | |||||||||||||||||||||||||||
def default_file_loaders_map(): | ||||||||||||||||||||||||||||
default_loaders = get_file_loaders_map() | ||||||||||||||||||||||||||||
default_loaders[".txt"] = FlatReader | ||||||||||||||||||||||||||||
default_loaders[".csv"] = FlatReader | ||||||||||||||||||||||||||||
leehuwuj marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
return default_loaders | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
class PrivateFileService: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
To store the files uploaded by the user and add them to the index. | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
PRIVATE_STORE_PATH = "output/uploaded" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def preprocess_base64_file(base64_content: str) -> Tuple[bytes, str | None]: | ||||||||||||||||||||||||||||
def _preprocess_base64_file(base64_content: str) -> Tuple[bytes, str | None]: | ||||||||||||||||||||||||||||
header, data = base64_content.split(",", 1) | ||||||||||||||||||||||||||||
mime_type = header.split(";")[0].split(":", 1)[1] | ||||||||||||||||||||||||||||
extension = mimetypes.guess_extension(mime_type) | ||||||||||||||||||||||||||||
# File data as bytes | ||||||||||||||||||||||||||||
return base64.b64decode(data), extension | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def store_and_parse_file(file_name, file_data, extension) -> List[Document]: | ||||||||||||||||||||||||||||
def _store_file(file_name, file_data) -> FileMetadata: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Store the file to the private directory and return the file metadata | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
# Store file to the private directory | ||||||||||||||||||||||||||||
os.makedirs(PrivateFileService.PRIVATE_STORE_PATH, exist_ok=True) | ||||||||||||||||||||||||||||
file_path = Path(os.path.join(PrivateFileService.PRIVATE_STORE_PATH, file_name)) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# write file | ||||||||||||||||||||||||||||
with open(file_path, "wb") as f: | ||||||||||||||||||||||||||||
f.write(file_data) | ||||||||||||||||||||||||||||
return save_file(file_data, file_path=str(file_path)) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def _load_file_to_documents(file_metadata: FileMetadata) -> List[Document]: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Load the file from the private directory and return the documents | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
extension = file_metadata.name.split(".")[-1] | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve file extension extraction using The current method of extracting the file extension may not handle filenames without extensions correctly. Using Apply this diff to fix the issue: -extension = file_metadata.name.split(".")[-1]
+_, extension = os.path.splitext(file_metadata.name)
+extension = extension.lstrip(".") 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Load file to documents | ||||||||||||||||||||||||||||
# If LlamaParse is enabled, use it to parse the file | ||||||||||||||||||||||||||||
# Otherwise, use the default file loaders | ||||||||||||||||||||||||||||
reader = get_llamaparse_parser() | ||||||||||||||||||||||||||||
if reader is None: | ||||||||||||||||||||||||||||
reader_cls = default_file_loaders_map().get(extension) | ||||||||||||||||||||||||||||
reader_cls = default_file_loaders_map().get(f".{extension}") | ||||||||||||||||||||||||||||
if reader_cls is None: | ||||||||||||||||||||||||||||
raise ValueError(f"File extension {extension} is not supported") | ||||||||||||||||||||||||||||
reader = reader_cls() | ||||||||||||||||||||||||||||
documents = reader.load_data(file_path) | ||||||||||||||||||||||||||||
documents = reader.load_data(Path(file_metadata.path)) | ||||||||||||||||||||||||||||
# Add custom metadata | ||||||||||||||||||||||||||||
for doc in documents: | ||||||||||||||||||||||||||||
doc.metadata["file_name"] = file_name | ||||||||||||||||||||||||||||
doc.metadata["file_name"] = file_metadata.name | ||||||||||||||||||||||||||||
doc.metadata["private"] = "true" | ||||||||||||||||||||||||||||
leehuwuj marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
return documents | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def _add_documents_to_vector_store_index( | ||||||||||||||||||||||||||||
documents: List[Document], index: VectorStoreIndex | ||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Add the documents to the vector store index | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
pipeline = IngestionPipeline() | ||||||||||||||||||||||||||||
nodes = pipeline.run(documents=documents) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Add the nodes to the index and persist it | ||||||||||||||||||||||||||||
if index is None: | ||||||||||||||||||||||||||||
index = VectorStoreIndex(nodes=nodes) | ||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
index.insert_nodes(nodes=nodes) | ||||||||||||||||||||||||||||
index.storage_context.persist( | ||||||||||||||||||||||||||||
persist_dir=os.environ.get("STORAGE_DIR", "storage") | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def _add_file_to_llama_cloud_index( | ||||||||||||||||||||||||||||
index: LlamaCloudIndex, | ||||||||||||||||||||||||||||
file_name: str, | ||||||||||||||||||||||||||||
file_data: bytes, | ||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
Add the file to the LlamaCloud index. | ||||||||||||||||||||||||||||
LlamaCloudIndex is a managed index so we can directly use the files. | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||
from app.engine.service import LLamaCloudFileService | ||||||||||||||||||||||||||||
except ImportError: | ||||||||||||||||||||||||||||
raise ValueError("LlamaCloudFileService is not found") | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
leehuwuj marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
project_id = index._get_project_id() | ||||||||||||||||||||||||||||
pipeline_id = index._get_pipeline_id() | ||||||||||||||||||||||||||||
# LlamaCloudIndex is a managed index so we can directly use the files | ||||||||||||||||||||||||||||
upload_file = (file_name, BytesIO(file_data)) | ||||||||||||||||||||||||||||
return [ | ||||||||||||||||||||||||||||
LLamaCloudFileService.add_file_to_pipeline( | ||||||||||||||||||||||||||||
project_id, | ||||||||||||||||||||||||||||
pipeline_id, | ||||||||||||||||||||||||||||
upload_file, | ||||||||||||||||||||||||||||
custom_metadata={}, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent return type in The method Consider updating the method to return -return [
- LLamaCloudFileService.add_file_to_pipeline(
- project_id,
- pipeline_id,
- upload_file,
- custom_metadata={},
- )
-]
+LLamaCloudFileService.add_file_to_pipeline(
+ project_id,
+ pipeline_id,
+ upload_file,
+ custom_metadata={},
+) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||
def _sanitize_file_name(file_name: str) -> str: | ||||||||||||||||||||||||||||
file_name, extension = os.path.splitext(file_name) | ||||||||||||||||||||||||||||
return re.sub(r"[^a-zA-Z0-9]", "_", file_name) + extension | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@classmethod | ||||||||||||||||||||||||||||
def process_file( | ||||||||||||||||||||||||||||
file_name: str, base64_content: str, params: Optional[dict] = None | ||||||||||||||||||||||||||||
) -> List[str]: | ||||||||||||||||||||||||||||
cls, | ||||||||||||||||||||||||||||
file_name: str, | ||||||||||||||||||||||||||||
base64_content: str, | ||||||||||||||||||||||||||||
params: Optional[dict] = None, | ||||||||||||||||||||||||||||
) -> FileMetadata: | ||||||||||||||||||||||||||||
if params is None: | ||||||||||||||||||||||||||||
params = {} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
file_data, extension = PrivateFileService.preprocess_base64_file(base64_content) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Add the nodes to the index and persist it | ||||||||||||||||||||||||||||
index_config = IndexConfig(**params) | ||||||||||||||||||||||||||||
current_index = get_index(index_config) | ||||||||||||||||||||||||||||
index = get_index(index_config) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Insert the documents into the index | ||||||||||||||||||||||||||||
if isinstance(current_index, LlamaCloudIndex): | ||||||||||||||||||||||||||||
from app.engine.service import LLamaCloudFileService | ||||||||||||||||||||||||||||
# Generate a new file name if the same file is uploaded multiple times | ||||||||||||||||||||||||||||
file_id = str(uuid.uuid4()) | ||||||||||||||||||||||||||||
new_file_name = f"{file_id}_{cls._sanitize_file_name(file_name)}" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Preprocess and store the file | ||||||||||||||||||||||||||||
file_data, extension = cls._preprocess_base64_file(base64_content) | ||||||||||||||||||||||||||||
file_metadata = cls._store_file(new_file_name, file_data) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
project_id = current_index._get_project_id() | ||||||||||||||||||||||||||||
pipeline_id = current_index._get_pipeline_id() | ||||||||||||||||||||||||||||
# LlamaCloudIndex is a managed index so we can directly use the files | ||||||||||||||||||||||||||||
upload_file = (file_name, BytesIO(file_data)) | ||||||||||||||||||||||||||||
return [ | ||||||||||||||||||||||||||||
LLamaCloudFileService.add_file_to_pipeline( | ||||||||||||||||||||||||||||
project_id, | ||||||||||||||||||||||||||||
pipeline_id, | ||||||||||||||||||||||||||||
upload_file, | ||||||||||||||||||||||||||||
custom_metadata={ | ||||||||||||||||||||||||||||
# Set private=true to mark the document as private user docs (required for filtering) | ||||||||||||||||||||||||||||
"private": "true", | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||
# Insert the file into the index | ||||||||||||||||||||||||||||
if isinstance(index, LlamaCloudIndex): | ||||||||||||||||||||||||||||
_ = cls._add_file_to_llama_cloud_index(index, new_file_name, file_data) | ||||||||||||||||||||||||||||
leehuwuj marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
# First process documents into nodes | ||||||||||||||||||||||||||||
documents = PrivateFileService.store_and_parse_file( | ||||||||||||||||||||||||||||
file_name, file_data, extension | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
pipeline = IngestionPipeline() | ||||||||||||||||||||||||||||
nodes = pipeline.run(documents=documents) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Add the nodes to the index and persist it | ||||||||||||||||||||||||||||
if current_index is None: | ||||||||||||||||||||||||||||
current_index = VectorStoreIndex(nodes=nodes) | ||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
current_index.insert_nodes(nodes=nodes) | ||||||||||||||||||||||||||||
current_index.storage_context.persist( | ||||||||||||||||||||||||||||
persist_dir=os.environ.get("STORAGE_DIR", "storage") | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
documents = cls._load_file_to_documents(file_metadata) | ||||||||||||||||||||||||||||
cls._add_documents_to_vector_store_index(documents, index) | ||||||||||||||||||||||||||||
# Add document ids to the file metadata | ||||||||||||||||||||||||||||
file_metadata.document_ids = [doc.doc_id for doc in documents] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# Return the document ids | ||||||||||||||||||||||||||||
return [doc.doc_id for doc in documents] | ||||||||||||||||||||||||||||
# Return the file metadata | ||||||||||||||||||||||||||||
return file_metadata | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure In the If |
Uh oh!
There was an error while loading. Please reload this page.