From b9557008aa1ed0da609b43c3b95d7ae9d1e1ce1f Mon Sep 17 00:00:00 2001 From: Raghu Ganapathi Date: Wed, 24 May 2023 16:58:26 -0700 Subject: [PATCH 1/3] Add loguru for logging --- datastore/providers/milvus_datastore.py | 59 ++++++++----------- datastore/providers/pgvector_datastore.py | 3 +- datastore/providers/pinecone_datastore.py | 43 +++++++------- datastore/providers/redis_datastore.py | 34 +++++------ datastore/providers/zilliz_datastore.py | 5 +- .../authentication-methods/no-auth/main.py | 11 ++-- examples/memory/main.py | 13 ++-- local_server/main.py | 9 +-- poetry.lock | 2 +- pyproject.toml | 1 + scripts/process_json/process_json.py | 19 +++--- scripts/process_jsonl/process_jsonl.py | 15 ++--- scripts/process_zip/process_zip.py | 17 +++--- server/main.py | 11 ++-- services/date.py | 3 +- services/extract_metadata.py | 3 +- services/file.py | 11 ++-- services/openai.py | 7 ++- 18 files changed, 137 insertions(+), 129 deletions(-) diff --git a/datastore/providers/milvus_datastore.py b/datastore/providers/milvus_datastore.py index 202e86d55..827a37e3e 100644 --- a/datastore/providers/milvus_datastore.py +++ b/datastore/providers/milvus_datastore.py @@ -2,6 +2,7 @@ import os import asyncio +from loguru import logger from typing import Dict, List, Optional from pymilvus import ( Collection, @@ -124,14 +125,6 @@ def __init__( self._create_collection(MILVUS_COLLECTION, create_new) # type: ignore self._create_index() - def _print_info(self, msg): - # TODO: logger - print(msg) - - def _print_err(self, msg): - # TODO: logger - print(msg) - def _get_schema(self): return SCHEMA_V1 if self._schema_ver == "V1" else SCHEMA_V2 @@ -143,7 +136,7 @@ def _create_connection(self): addr = connections.get_connection_addr(x[0]) if x[1] and ('address' in addr) and (addr['address'] == "{}:{}".format(MILVUS_HOST, MILVUS_PORT)): self.alias = x[0] - self._print_info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'" + logger.info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'" .format(MILVUS_HOST, MILVUS_PORT, self.alias)) break @@ -158,10 +151,10 @@ def _create_connection(self): password=MILVUS_PASSWORD, # type: ignore secure=MILVUS_USE_SECURITY, ) - self._print_info("Create connection to Milvus server '{}:{}' with alias '{:s}'" + logger.info("Create connection to Milvus server '{}:{}' with alias '{:s}'" .format(MILVUS_HOST, MILVUS_PORT, self.alias)) except Exception as e: - self._print_err("Failed to create connection to Milvus server '{}:{}', error: {}" + logger.exception("Failed to create connection to Milvus server '{}:{}', error: {}" .format(MILVUS_HOST, MILVUS_PORT, e)) def _create_collection(self, collection_name, create_new: bool) -> None: @@ -189,7 +182,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None: consistency_level=self._consistency_level, ) self._schema_ver = "V2" - self._print_info("Create Milvus collection '{}' with schema {} and consistency level {}" + logger.info("Create Milvus collection '{}' with schema {} and consistency level {}" .format(collection_name, self._schema_ver, self._consistency_level)) else: # If the collection exists, point to it @@ -201,10 +194,10 @@ def _create_collection(self, collection_name, create_new: bool) -> None: if field.name == "id" and field.is_primary: self._schema_ver = "V2" break - self._print_info("Milvus collection '{}' already exists with schema {}" + logger.info("Milvus collection '{}' already exists with schema {}" .format(collection_name, self._schema_ver)) except Exception as e: - self._print_err("Failed to create collection '{}', error: {}".format(collection_name, e)) + logger.exception("Failed to create collection '{}', error: {}".format(collection_name, e)) def _create_index(self): # TODO: verify index/search params passed by os.environ @@ -216,7 +209,7 @@ def _create_index(self): if self.index_params is not None: # Convert the string format to JSON format parameters passed by MILVUS_INDEX_PARAMS self.index_params = json.loads(self.index_params) - self._print_info("Create Milvus index: {}".format(self.index_params)) + logger.info("Create Milvus index: {}".format(self.index_params)) # Create an index on the 'embedding' field with the index params found in init self.col.create_index(EMBEDDING_FIELD, index_params=self.index_params) else: @@ -227,24 +220,24 @@ def _create_index(self): "index_type": "HNSW", "params": {"M": 8, "efConstruction": 64}, } - self._print_info("Attempting creation of Milvus '{}' index".format(i_p["index_type"])) + logger.info("Attempting creation of Milvus '{}' index".format(i_p["index_type"])) self.col.create_index(EMBEDDING_FIELD, index_params=i_p) self.index_params = i_p - self._print_info("Creation of Milvus '{}' index successful".format(i_p["index_type"])) + logger.info("Creation of Milvus '{}' index successful".format(i_p["index_type"])) # If create fails, most likely due to being Zilliz Cloud instance, try to create an AutoIndex except MilvusException: - self._print_info("Attempting creation of Milvus default index") + logger.info("Attempting creation of Milvus default index") i_p = {"metric_type": "IP", "index_type": "AUTOINDEX", "params": {}} self.col.create_index(EMBEDDING_FIELD, index_params=i_p) self.index_params = i_p - self._print_info("Creation of Milvus default index successful") + logger.info("Creation of Milvus default index successful") # If an index already exists, grab its params else: # How about if the first index is not vector index? for index in self.col.indexes: idx = index.to_dict() if idx["field"] == EMBEDDING_FIELD: - self._print_info("Index already exists: {}".format(idx)) + logger.info("Index already exists: {}".format(idx)) self.index_params = idx['index_param'] break @@ -272,9 +265,9 @@ def _create_index(self): } # Set the search params self.search_params = default_search_params[self.index_params["index_type"]] - self._print_info("Milvus search parameters: {}".format(self.search_params)) + logger.info("Milvus search parameters: {}".format(self.search_params)) except Exception as e: - self._print_err("Failed to create index, error: {}".format(e)) + logger.exception("Failed to create index, error: {}".format(e)) async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: """Upsert chunks into the datastore. @@ -319,18 +312,18 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: for batch in batches: if len(batch[0]) != 0: try: - self._print_info(f"Upserting batch of size {len(batch[0])}") + logger.info(f"Upserting batch of size {len(batch[0])}") self.col.insert(batch) - self._print_info(f"Upserted batch successfully") + logger.info(f"Upserted batch successfully") except Exception as e: - self._print_err(f"Failed to insert batch records, error: {e}") + logger.exception(f"Failed to insert batch records, error: {e}") raise e # This setting perfoms flushes after insert. Small insert == bad to use # self.col.flush() return doc_ids except Exception as e: - self._print_err("Failed to insert records, error: {}".format(e)) + logger.exception("Failed to insert records, error: {}".format(e)) return [] @@ -365,7 +358,7 @@ def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore x = values.get(key) or default # If one of our required fields is missing, ignore the entire entry if x is Required: - self._print_info("Chunk " + values["id"] + " missing " + key + " skipping") + logger.info("Chunk " + values["id"] + " missing " + key + " skipping") return None # Add the corresponding value if it passes the tests ret.append(x) @@ -436,7 +429,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: return QueryResult(query=query.query, results=results) except Exception as e: - self._print_err("Failed to query, error: {}".format(e)) + logger.exception("Failed to query, error: {}".format(e)) return QueryResult(query=query.query, results=[]) results: List[QueryResult] = await asyncio.gather( @@ -460,7 +453,7 @@ async def delete( # If deleting all, drop and create the new collection if delete_all: coll_name = self.col.name - self._print_info("Delete the entire collection {} and create new one".format(coll_name)) + logger.info("Delete the entire collection {} and create new one".format(coll_name)) # Release the collection from memory self.col.release() # Drop the collection @@ -490,7 +483,7 @@ async def delete( pks = ['"' + pk + '"' for pk in pks] # Delete by ids batch by batch(avoid too long expression) - self._print_info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver)) + logger.info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver)) while len(pks) > 0: batch_pks = pks[:batch_size] pks = pks[batch_size:] @@ -499,7 +492,7 @@ async def delete( # Increment our deleted count delete_count += int(res.delete_count) # type: ignore except Exception as e: - self._print_err("Failed to delete by ids, error: {}".format(e)) + logger.exception("Failed to delete by ids, error: {}".format(e)) try: # Check if empty filter @@ -524,9 +517,9 @@ async def delete( # Increment our delete count delete_count += int(res.delete_count) # type: ignore except Exception as e: - self._print_err("Failed to delete by filter, error: {}".format(e)) + logger.exception("Failed to delete by filter, error: {}".format(e)) - self._print_info("{:d} records deleted".format(delete_count)) + logger.info("{:d} records deleted".format(delete_count)) # This setting performs flushes after delete. Small delete == bad to use # self.col.flush() diff --git a/datastore/providers/pgvector_datastore.py b/datastore/providers/pgvector_datastore.py index 14c2aea1d..be4c93455 100644 --- a/datastore/providers/pgvector_datastore.py +++ b/datastore/providers/pgvector_datastore.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional from datetime import datetime +from loguru import logger from services.date import to_unix_timestamp from datastore.datastore import DataStore @@ -147,7 +148,7 @@ async def _query(self, queries: List[QueryWithEmbedding]) -> List[QueryResult]: results.append(document_chunk) query_results.append(QueryResult(query=query.query, results=results)) except Exception as e: - print("error:", e) + logger.exception("error:", e) query_results.append(QueryResult(query=query.query, results=[])) return query_results diff --git a/datastore/providers/pinecone_datastore.py b/datastore/providers/pinecone_datastore.py index 2896cf66b..870f02ec7 100644 --- a/datastore/providers/pinecone_datastore.py +++ b/datastore/providers/pinecone_datastore.py @@ -3,6 +3,7 @@ import pinecone from tenacity import retry, wait_random_exponential, stop_after_attempt import asyncio +from loguru import logger from datastore.datastore import DataStore from models.models import ( @@ -41,7 +42,7 @@ def __init__(self): # Create a new index with the specified name, dimension, and metadata configuration try: - print( + logger.info( f"Creating index {PINECONE_INDEX} with metadata config {fields_to_index}" ) pinecone.create_index( @@ -50,18 +51,18 @@ def __init__(self): metadata_config={"indexed": fields_to_index}, ) self.index = pinecone.Index(PINECONE_INDEX) - print(f"Index {PINECONE_INDEX} created successfully") + logger.info(f"Index {PINECONE_INDEX} created successfully") except Exception as e: - print(f"Error creating index {PINECONE_INDEX}: {e}") + logger.exception(f"Error creating index {PINECONE_INDEX}: {e}") raise e elif PINECONE_INDEX and PINECONE_INDEX in pinecone.list_indexes(): # Connect to an existing index with the specified name try: - print(f"Connecting to existing index {PINECONE_INDEX}") + logger.info(f"Connecting to existing index {PINECONE_INDEX}") self.index = pinecone.Index(PINECONE_INDEX) - print(f"Connected to index {PINECONE_INDEX} successfully") + logger.info(f"Connected to index {PINECONE_INDEX} successfully") except Exception as e: - print(f"Error connecting to index {PINECONE_INDEX}: {e}") + logger.exception(f"Error connecting to index {PINECONE_INDEX}: {e}") raise e @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) @@ -78,7 +79,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: for doc_id, chunk_list in chunks.items(): # Append the id to the ids list doc_ids.append(doc_id) - print(f"Upserting document_id: {doc_id}") + logger.info(f"Upserting document_id: {doc_id}") for chunk in chunk_list: # Create a vector tuple of (id, embedding, metadata) # Convert the metadata object to a dict with unix timestamps for dates @@ -97,11 +98,11 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: # Upsert each batch to Pinecone for batch in batches: try: - print(f"Upserting batch of size {len(batch)}") + logger.info(f"Upserting batch of size {len(batch)}") self.index.upsert(vectors=batch) - print(f"Upserted batch successfully") + logger.info(f"Upserted batch successfully") except Exception as e: - print(f"Error upserting batch: {e}") + logger.exception(f"Error upserting batch: {e}") raise e return doc_ids @@ -117,7 +118,7 @@ async def _query( # Define a helper coroutine that performs a single query and returns a QueryResult async def _single_query(query: QueryWithEmbedding) -> QueryResult: - print(f"Query: {query.query}") + logger.debug(f"Query: {query.query}") # Convert the metadata filter object to a dict with pinecone filter expressions pinecone_filter = self._get_pinecone_filter(query.filter) @@ -132,7 +133,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: include_metadata=True, ) except Exception as e: - print(f"Error querying index: {e}") + logger.exception(f"Error querying index: {e}") raise e query_results: List[DocumentChunkWithScore] = [] @@ -184,12 +185,12 @@ async def delete( # Delete all vectors from the index if delete_all is True if delete_all: try: - print(f"Deleting all vectors from index") + logger.info(f"Deleting all vectors from index") self.index.delete(delete_all=True) - print(f"Deleted all vectors successfully") + logger.info(f"Deleted all vectors successfully") return True except Exception as e: - print(f"Error deleting all vectors: {e}") + logger.exception(f"Error deleting all vectors: {e}") raise e # Convert the metadata filter object to a dict with pinecone filter expressions @@ -197,22 +198,22 @@ async def delete( # Delete vectors that match the filter from the index if the filter is not empty if pinecone_filter != {}: try: - print(f"Deleting vectors with filter {pinecone_filter}") + logger.info(f"Deleting vectors with filter {pinecone_filter}") self.index.delete(filter=pinecone_filter) - print(f"Deleted vectors with filter successfully") + logger.info(f"Deleted vectors with filter successfully") except Exception as e: - print(f"Error deleting vectors with filter: {e}") + logger.exception(f"Error deleting vectors with filter: {e}") raise e # Delete vectors that match the document ids from the index if the ids list is not empty if ids is not None and len(ids) > 0: try: - print(f"Deleting vectors with ids {ids}") + logger.info(f"Deleting vectors with ids {ids}") pinecone_filter = {"document_id": {"$in": ids}} self.index.delete(filter=pinecone_filter) # type: ignore - print(f"Deleted vectors with ids successfully") + logger.info(f"Deleted vectors with ids successfully") except Exception as e: - print(f"Error deleting vectors with ids: {e}") + logger.exception(f"Error deleting vectors with ids: {e}") raise e return True diff --git a/datastore/providers/redis_datastore.py b/datastore/providers/redis_datastore.py index 669f3fb83..9b82b052c 100644 --- a/datastore/providers/redis_datastore.py +++ b/datastore/providers/redis_datastore.py @@ -1,5 +1,4 @@ import asyncio -import logging import os import re import json @@ -14,6 +13,7 @@ NumericField, VectorField, ) +from loguru import logger from typing import Dict, List, Optional from datastore.datastore import DataStore from models.models import ( @@ -62,7 +62,7 @@ async def _check_redis_module_exist(client: redis.Redis, modules: List[dict]): if module["name"] not in installed_modules or int(installed_modules[module["name"]]["ver"]) < int(module["ver"]): error_message = "You must add the RediSearch (>= 2.6) and ReJSON (>= 2.4) modules from Redis Stack. " \ "Please refer to Redis Stack docs: https://redis.io/docs/stack/" - logging.error(error_message) + logger.exception(error_message) raise AttributeError(error_message) @@ -84,12 +84,12 @@ async def init(cls, **kwargs): """ try: # Connect to the Redis Client - logging.info("Connecting to Redis") + logger.info("Connecting to Redis") client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD ) except Exception as e: - logging.error(f"Error setting up Redis: {e}") + logger.exception(f"Error setting up Redis: {e}") raise e await _check_redis_module_exist(client, modules=REDIS_REQUIRED_MODULES) @@ -117,15 +117,15 @@ async def init(cls, **kwargs): try: # Check for existence of RediSearch Index await client.ft(REDIS_INDEX_NAME).info() - logging.info(f"RediSearch index {REDIS_INDEX_NAME} already exists") + logger.info(f"RediSearch index {REDIS_INDEX_NAME} already exists") except: # Create the RediSearch Index - logging.info(f"Creating new RediSearch index {REDIS_INDEX_NAME}") + logger.info(f"Creating new RediSearch index {REDIS_INDEX_NAME}") definition = IndexDefinition( prefix=[REDIS_DOC_PREFIX], index_type=IndexType.JSON ) fields = list(unpack_schema(redisearch_schema)) - logging.info(f"Creating index with fields: {fields}") + logger.info(f"Creating index with fields: {fields}") await client.ft(REDIS_INDEX_NAME).create_index( fields=fields, definition=definition ) @@ -299,10 +299,10 @@ async def _query( results: List[QueryResult] = [] # Gather query results in a pipeline - logging.info(f"Gathering {len(queries)} query results") + logger.info(f"Gathering {len(queries)} query results") for query in queries: - logging.info(f"Query: {query.query}") + logger.debug(f"Query: {query.query}") query_results: List[DocumentChunkWithScore] = [] # Extract Redis query @@ -348,12 +348,12 @@ async def delete( # Delete all vectors from the index if delete_all is True if delete_all: try: - logging.info(f"Deleting all documents from index") + logger.info(f"Deleting all documents from index") await self.client.ft(REDIS_INDEX_NAME).dropindex(True) - logging.info(f"Deleted all documents successfully") + logger.info(f"Deleted all documents successfully") return True except Exception as e: - logging.info(f"Error deleting all documents: {e}") + logger.exception(f"Error deleting all documents: {e}") raise e # Delete by filter @@ -365,15 +365,15 @@ async def delete( f"{REDIS_DOC_PREFIX}:{filter.document_id}:*" ) await self._redis_delete(keys) - logging.info(f"Deleted document {filter.document_id} successfully") + logger.info(f"Deleted document {filter.document_id} successfully") except Exception as e: - logging.info(f"Error deleting document {filter.document_id}: {e}") + logger.exception(f"Error deleting document {filter.document_id}: {e}") raise e # Delete by explicit ids (Redis keys) if ids: try: - logging.info(f"Deleting document ids {ids}") + logger.info(f"Deleting document ids {ids}") keys = [] # find all keys associated with the document ids for document_id in ids: @@ -382,10 +382,10 @@ async def delete( ) keys.extend(doc_keys) # delete all keys - logging.info(f"Deleting {len(keys)} keys from Redis") + logger.info(f"Deleting {len(keys)} keys from Redis") await self._redis_delete(keys) except Exception as e: - logging.info(f"Error deleting ids: {e}") + logger.exception(f"Error deleting ids: {e}") raise e return True diff --git a/datastore/providers/zilliz_datastore.py b/datastore/providers/zilliz_datastore.py index 1db641f63..ef7359951 100644 --- a/datastore/providers/zilliz_datastore.py +++ b/datastore/providers/zilliz_datastore.py @@ -1,5 +1,6 @@ import os +from loguru import logger from typing import Optional from pymilvus import ( connections, @@ -47,7 +48,7 @@ def _create_connection(self): # Connect to the Zilliz instance using the passed in Environment variables self.alias = uuid4().hex connections.connect(alias=self.alias, uri=ZILLIZ_URI, user=ZILLIZ_USER, password=ZILLIZ_PASSWORD, secure=ZILLIZ_USE_SECURITY) # type: ignore - self._print_info("Connect to zilliz cloud server") + logger.info("Connect to zilliz cloud server") def _create_index(self): try: @@ -59,6 +60,6 @@ def _create_index(self): self.col.load() self.search_params = {"metric_type": "IP", "params": {}} except Exception as e: - self._print_err("Failed to create index, error: {}".format(e)) + logger.exception("Failed to create index, error: {}".format(e)) diff --git a/examples/authentication-methods/no-auth/main.py b/examples/authentication-methods/no-auth/main.py index 1fd5458b4..bfd02f959 100644 --- a/examples/authentication-methods/no-auth/main.py +++ b/examples/authentication-methods/no-auth/main.py @@ -4,6 +4,7 @@ import uvicorn from fastapi import FastAPI, File, Form, HTTPException, Body, UploadFile from fastapi.staticfiles import StaticFiles +from loguru import logger from models.api import ( DeleteRequest, @@ -55,7 +56,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -70,7 +71,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -87,7 +88,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -105,7 +106,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -129,7 +130,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/examples/memory/main.py b/examples/memory/main.py index 5c96e4289..66c5f773c 100644 --- a/examples/memory/main.py +++ b/examples/memory/main.py @@ -8,6 +8,7 @@ from fastapi import FastAPI, File, Form, HTTPException, Depends, Body, UploadFile from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.staticfiles import StaticFiles +from loguru import logger from models.api import ( DeleteRequest, @@ -71,7 +72,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -87,7 +88,7 @@ async def upsert_main( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -105,7 +106,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -123,7 +124,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -143,7 +144,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -168,7 +169,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/local_server/main.py b/local_server/main.py index 7df685127..286fa09cf 100644 --- a/local_server/main.py +++ b/local_server/main.py @@ -3,6 +3,7 @@ from typing import Optional import uvicorn from fastapi import FastAPI, File, Form, HTTPException, Body, UploadFile +from loguru import logger from models.api import ( DeleteRequest, @@ -82,7 +83,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -97,7 +98,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -109,7 +110,7 @@ async def query_main(request: QueryRequest = Body(...)): ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -133,7 +134,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/poetry.lock b/poetry.lock index 966a401e4..1228277b4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4732,4 +4732,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "89915adf15b0d7bfb147c5e8dc64abaf2c6bb7db16e9fc2ae13eac2dc4f58267" +content-hash = "6a3390a541b5d6846294881d208d8f710e409af05e71bbf67252b741de70c56a" diff --git a/pyproject.toml b/pyproject.toml index 68cacdf8b..e06b73b7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ llama-index = "0.5.4" azure-identity = "^1.12.0" azure-search-documents = {version = "11.4.0a20230509004", source = "azure-sdk-dev"} pgvector = "^0.1.7" +loguru = "^0.7.0" [tool.poetry.scripts] start = "server.main:start" diff --git a/scripts/process_json/process_json.py b/scripts/process_json/process_json.py index 0dc2f8439..5f29269cb 100644 --- a/scripts/process_json/process_json.py +++ b/scripts/process_json/process_json.py @@ -3,6 +3,7 @@ import argparse import asyncio +from loguru import logger from models.models import Document, DocumentMetadata from datastore.datastore import DataStore from datastore.factory import get_datastore @@ -28,7 +29,7 @@ async def process_json_dump( # iterate over the data and create document objects for item in data: if len(documents) % 20 == 0: - print(f"Processed {len(documents)} documents") + logger.info(f"Processed {len(documents)} documents") try: # get the id, text, source, source_id, url, created_at and author from the item @@ -42,7 +43,7 @@ async def process_json_dump( author = item.get("author", None) if not text: - print("No document text, skipping...") + logger.info("No document text, skipping...") continue # create a metadata object with the source, source_id, url, created_at and author @@ -53,7 +54,7 @@ async def process_json_dump( created_at=created_at, author=author, ) - print("metadata: ", str(metadata)) + logger.info("metadata: ", str(metadata)) # update metadata with custom values for key, value in custom_metadata.items(): @@ -65,7 +66,7 @@ async def process_json_dump( pii_detected = screen_text_for_pii(text) # if pii detected, print a warning and skip the document if pii_detected: - print("PII detected in document, skipping") + logger.info("PII detected in document, skipping") skipped_items.append(item) # add the skipped item to the list continue @@ -87,7 +88,7 @@ async def process_json_dump( documents.append(document) except Exception as e: # log the error and continue with the next item - print(f"Error processing {item}: {e}") + logger.exception(f"Error processing {item}: {e}") skipped_items.append(item) # add the skipped item to the list # do this in batches, the upsert method already batches documents but this allows @@ -95,14 +96,14 @@ async def process_json_dump( for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE): # Get the text of the chunks in the current batch batch_documents = documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE] - print(f"Upserting batch of {len(batch_documents)} documents, batch {i}") - print("documents: ", documents) + logger.info(f"Upserting batch of {len(batch_documents)} documents, batch {i}") + logger.info("documents: ", documents) await datastore.upsert(batch_documents) # print the skipped items - print(f"Skipped {len(skipped_items)} items due to errors or PII detection") + logger.info(f"Skipped {len(skipped_items)} items due to errors or PII detection") for item in skipped_items: - print(item) + logger.info(item) async def main(): diff --git a/scripts/process_jsonl/process_jsonl.py b/scripts/process_jsonl/process_jsonl.py index 8795553cd..f3db7869d 100644 --- a/scripts/process_jsonl/process_jsonl.py +++ b/scripts/process_jsonl/process_jsonl.py @@ -3,6 +3,7 @@ import argparse import asyncio +from loguru import logger from models.models import Document, DocumentMetadata from datastore.datastore import DataStore from datastore.factory import get_datastore @@ -28,7 +29,7 @@ async def process_jsonl_dump( # iterate over the data and create document objects for item in data: if len(documents) % 20 == 0: - print(f"Processed {len(documents)} documents") + logger.info(f"Processed {len(documents)} documents") try: # get the id, text, source, source_id, url, created_at and author from the item @@ -42,7 +43,7 @@ async def process_jsonl_dump( author = item.get("author", None) if not text: - print("No document text, skipping...") + logger.info("No document text, skipping...") continue # create a metadata object with the source, source_id, url, created_at and author @@ -64,7 +65,7 @@ async def process_jsonl_dump( pii_detected = screen_text_for_pii(text) # if pii detected, print a warning and skip the document if pii_detected: - print("PII detected in document, skipping") + logger.info("PII detected in document, skipping") skipped_items.append(item) # add the skipped item to the list continue @@ -86,7 +87,7 @@ async def process_jsonl_dump( documents.append(document) except Exception as e: # log the error and continue with the next item - print(f"Error processing {item}: {e}") + logger.exception(f"Error processing {item}: {e}") skipped_items.append(item) # add the skipped item to the list # do this in batches, the upsert method already batches documents but this allows @@ -94,13 +95,13 @@ async def process_jsonl_dump( for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE): # Get the text of the chunks in the current batch batch_documents = documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE] - print(f"Upserting batch of {len(batch_documents)} documents, batch {i}") + logger.info(f"Upserting batch of {len(batch_documents)} documents, batch {i}") await datastore.upsert(batch_documents) # print the skipped items - print(f"Skipped {len(skipped_items)} items due to errors or PII detection") + logger.info(f"Skipped {len(skipped_items)} items due to errors or PII detection") for item in skipped_items: - print(item) + logger.info(item) async def main(): diff --git a/scripts/process_zip/process_zip.py b/scripts/process_zip/process_zip.py index cffca2df7..5b4be1ee8 100644 --- a/scripts/process_zip/process_zip.py +++ b/scripts/process_zip/process_zip.py @@ -5,6 +5,7 @@ import argparse import asyncio +from loguru import logger from models.models import Document, DocumentMetadata, Source from datastore.datastore import DataStore from datastore.factory import get_datastore @@ -32,13 +33,13 @@ async def process_file_dump( for root, dirs, files in os.walk("dump"): for filename in files: if len(documents) % 20 == 0: - print(f"Processed {len(documents)} documents") + logger.info(f"Processed {len(documents)} documents") filepath = os.path.join(root, filename) try: extracted_text = extract_text_from_filepath(filepath) - print(f"extracted_text from {filepath}") + logger.info(f"extracted_text from {filepath}") # create a metadata object with the source and source_id fields metadata = DocumentMetadata( @@ -56,7 +57,7 @@ async def process_file_dump( pii_detected = screen_text_for_pii(extracted_text) # if pii detected, print a warning and skip the document if pii_detected: - print("PII detected in document, skipping") + logger.info("PII detected in document, skipping") skipped_files.append( filepath ) # add the skipped file to the list @@ -80,7 +81,7 @@ async def process_file_dump( documents.append(document) except Exception as e: # log the error and continue with the next file - print(f"Error processing {filepath}: {e}") + logger.exception(f"Error processing {filepath}: {e}") skipped_files.append(filepath) # add the skipped file to the list # do this in batches, the upsert method already batches documents but this allows @@ -88,8 +89,8 @@ async def process_file_dump( for i in range(0, len(documents), DOCUMENT_UPSERT_BATCH_SIZE): # Get the text of the chunks in the current batch batch_documents = [doc for doc in documents[i : i + DOCUMENT_UPSERT_BATCH_SIZE]] - print(f"Upserting batch of {len(batch_documents)} documents, batch {i}") - print("documents: ", documents) + logger.info(f"Upserting batch of {len(batch_documents)} documents, batch {i}") + logger.info("documents: ", documents) await datastore.upsert(batch_documents) # delete all files in the dump directory @@ -105,9 +106,9 @@ async def process_file_dump( os.rmdir("dump") # print the skipped files - print(f"Skipped {len(skipped_files)} files due to errors or PII detection") + logger.info(f"Skipped {len(skipped_files)} files due to errors or PII detection") for file in skipped_files: - print(file) + logger.info(file) async def main(): diff --git a/server/main.py b/server/main.py index 3d44ced4f..d8e9120a7 100644 --- a/server/main.py +++ b/server/main.py @@ -4,6 +4,7 @@ from fastapi import FastAPI, File, Form, HTTPException, Depends, Body, UploadFile from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.staticfiles import StaticFiles +from loguru import logger from models.api import ( DeleteRequest, @@ -66,7 +67,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -81,7 +82,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -98,7 +99,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -117,7 +118,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -141,7 +142,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - print("Error:", e) + logger.exception("Error:", e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/services/date.py b/services/date.py index 57b2ee5eb..476c7aedb 100644 --- a/services/date.py +++ b/services/date.py @@ -1,4 +1,5 @@ import arrow +from loguru import logger def to_unix_timestamp(date_str: str) -> int: @@ -19,5 +20,5 @@ def to_unix_timestamp(date_str: str) -> int: return int(date_obj.timestamp()) except arrow.parser.ParserError: # If the parsing fails, return the current unix timestamp and print a warning - print(f"Invalid date format: {date_str}") + logger.info(f"Invalid date format: {date_str}") return int(arrow.now().timestamp()) diff --git a/services/extract_metadata.py b/services/extract_metadata.py index deecb677b..8c8d4ae7e 100644 --- a/services/extract_metadata.py +++ b/services/extract_metadata.py @@ -3,6 +3,7 @@ import json from typing import Dict import os +from loguru import logger def extract_metadata_from_document(text: str) -> Dict[str, str]: sources = Source.__members__.keys() @@ -32,7 +33,7 @@ def extract_metadata_from_document(text: str) -> Dict[str, str]: os.environ.get("OPENAI_METADATA_EXTRACTIONMODEL_DEPLOYMENTID") ) # TODO: change to your preferred model name - print(f"completion: {completion}") + logger.info(f"completion: {completion}") try: metadata = json.loads(completion) diff --git a/services/file.py b/services/file.py index 90e0e5ea0..f0218d07f 100644 --- a/services/file.py +++ b/services/file.py @@ -7,6 +7,7 @@ import docx2txt import csv import pptx +from loguru import logger from models.models import Document, DocumentMetadata @@ -38,7 +39,7 @@ def extract_text_from_filepath(filepath: str, mimetype: Optional[str] = None) -> with open(filepath, "rb") as file: extracted_text = extract_text_from_file(file, mimetype) except Exception as e: - print(f"Error: {e}") + logger.exception(f"Error: {e}") raise e return extracted_text @@ -91,9 +92,9 @@ async def extract_text_from_form_file(file: UploadFile): """Return the text content of a file.""" # get the file body from the upload file object mimetype = file.content_type - print(f"mimetype: {mimetype}") - print(f"file.file: {file.file}") - print("file: ", file) + logger.info(f"mimetype: {mimetype}") + logger.info(f"file.file: {file.file}") + logger.info("file: ", file) file_stream = await file.read() @@ -106,7 +107,7 @@ async def extract_text_from_form_file(file: UploadFile): try: extracted_text = extract_text_from_filepath(temp_file_path, mimetype) except Exception as e: - print(f"Error: {e}") + logger.exception(f"Error: {e}") os.remove(temp_file_path) raise e diff --git a/services/openai.py b/services/openai.py index 426ad3511..ddc2855ee 100644 --- a/services/openai.py +++ b/services/openai.py @@ -1,6 +1,7 @@ from typing import List import openai import os +from loguru import logger from tenacity import retry, wait_random_exponential, stop_after_attempt @@ -28,7 +29,7 @@ def get_embeddings(texts: List[str]) -> List[List[float]]: response = openai.Embedding.create(input=texts, model="text-embedding-ada-002") else: response = openai.Embedding.create(input=texts, deployment_id=deployment) - + # Extract the embedding data from the response data = response["data"] # type: ignore @@ -68,9 +69,9 @@ def get_chat_completion( deployment_id = deployment_id, messages=messages, ) - + choices = response["choices"] # type: ignore completion = choices[0].message.content.strip() - print(f"Completion: {completion}") + logger.info(f"Completion: {completion}") return completion From 3b43cfdb2deaaaeb5b173acb067a05cc3f8b84a9 Mon Sep 17 00:00:00 2001 From: Raghu Ganapathi Date: Tue, 30 May 2023 15:05:30 -0700 Subject: [PATCH 2/3] Add logger --- datastore/providers/analyticdb_datastore.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datastore/providers/analyticdb_datastore.py b/datastore/providers/analyticdb_datastore.py index 4cd4b1ae5..b3e0c2a95 100644 --- a/datastore/providers/analyticdb_datastore.py +++ b/datastore/providers/analyticdb_datastore.py @@ -2,6 +2,7 @@ import asyncio from typing import Dict, List, Optional, Tuple, Any from datetime import datetime +from loguru import logger from psycopg2cffi import compat @@ -252,7 +253,7 @@ def create_results(data): QueryResult(query=query.query, results=results) ) except Exception as e: - print("error:", e) + logger.exception("error:", e) query_results.append(QueryResult(query=query.query, results=[])) return query_results finally: @@ -275,7 +276,7 @@ async def execute_delete(query: str, params: Optional[List] = None) -> bool: self.conn.commit() return True except Exception as e: - print(f"Error: {e}") + logger.exception(f"Error: {e}") return False finally: self.connection_pool.putconn(conn) From d74002dce7f995db79cf2d80dd33aa9e8397d8bb Mon Sep 17 00:00:00 2001 From: Raghu Ganapathi Date: Wed, 31 May 2023 12:41:49 -0700 Subject: [PATCH 3/3] Replace exception with error --- datastore/providers/analyticdb_datastore.py | 4 ++-- datastore/providers/milvus_datastore.py | 16 ++++++++-------- datastore/providers/pgvector_datastore.py | 2 +- datastore/providers/pinecone_datastore.py | 14 +++++++------- datastore/providers/redis_datastore.py | 10 +++++----- datastore/providers/weaviate_datastore.py | 2 +- datastore/providers/zilliz_datastore.py | 2 +- examples/authentication-methods/no-auth/main.py | 10 +++++----- examples/memory/main.py | 12 ++++++------ local_server/main.py | 8 ++++---- scripts/process_json/process_json.py | 2 +- scripts/process_jsonl/process_jsonl.py | 2 +- scripts/process_zip/process_zip.py | 2 +- server/main.py | 10 +++++----- services/file.py | 4 ++-- 15 files changed, 50 insertions(+), 50 deletions(-) diff --git a/datastore/providers/analyticdb_datastore.py b/datastore/providers/analyticdb_datastore.py index b3e0c2a95..ba206f2e1 100644 --- a/datastore/providers/analyticdb_datastore.py +++ b/datastore/providers/analyticdb_datastore.py @@ -253,7 +253,7 @@ def create_results(data): QueryResult(query=query.query, results=results) ) except Exception as e: - logger.exception("error:", e) + logger.error(e) query_results.append(QueryResult(query=query.query, results=[])) return query_results finally: @@ -276,7 +276,7 @@ async def execute_delete(query: str, params: Optional[List] = None) -> bool: self.conn.commit() return True except Exception as e: - logger.exception(f"Error: {e}") + logger.error(e) return False finally: self.connection_pool.putconn(conn) diff --git a/datastore/providers/milvus_datastore.py b/datastore/providers/milvus_datastore.py index 827a37e3e..d105cc4e9 100644 --- a/datastore/providers/milvus_datastore.py +++ b/datastore/providers/milvus_datastore.py @@ -154,7 +154,7 @@ def _create_connection(self): logger.info("Create connection to Milvus server '{}:{}' with alias '{:s}'" .format(MILVUS_HOST, MILVUS_PORT, self.alias)) except Exception as e: - logger.exception("Failed to create connection to Milvus server '{}:{}', error: {}" + logger.error("Failed to create connection to Milvus server '{}:{}', error: {}" .format(MILVUS_HOST, MILVUS_PORT, e)) def _create_collection(self, collection_name, create_new: bool) -> None: @@ -197,7 +197,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None: logger.info("Milvus collection '{}' already exists with schema {}" .format(collection_name, self._schema_ver)) except Exception as e: - logger.exception("Failed to create collection '{}', error: {}".format(collection_name, e)) + logger.error("Failed to create collection '{}', error: {}".format(collection_name, e)) def _create_index(self): # TODO: verify index/search params passed by os.environ @@ -267,7 +267,7 @@ def _create_index(self): self.search_params = default_search_params[self.index_params["index_type"]] logger.info("Milvus search parameters: {}".format(self.search_params)) except Exception as e: - logger.exception("Failed to create index, error: {}".format(e)) + logger.error("Failed to create index, error: {}".format(e)) async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: """Upsert chunks into the datastore. @@ -316,14 +316,14 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: self.col.insert(batch) logger.info(f"Upserted batch successfully") except Exception as e: - logger.exception(f"Failed to insert batch records, error: {e}") + logger.error(f"Failed to insert batch records, error: {e}") raise e # This setting perfoms flushes after insert. Small insert == bad to use # self.col.flush() return doc_ids except Exception as e: - logger.exception("Failed to insert records, error: {}".format(e)) + logger.error("Failed to insert records, error: {}".format(e)) return [] @@ -429,7 +429,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: return QueryResult(query=query.query, results=results) except Exception as e: - logger.exception("Failed to query, error: {}".format(e)) + logger.error("Failed to query, error: {}".format(e)) return QueryResult(query=query.query, results=[]) results: List[QueryResult] = await asyncio.gather( @@ -492,7 +492,7 @@ async def delete( # Increment our deleted count delete_count += int(res.delete_count) # type: ignore except Exception as e: - logger.exception("Failed to delete by ids, error: {}".format(e)) + logger.error("Failed to delete by ids, error: {}".format(e)) try: # Check if empty filter @@ -517,7 +517,7 @@ async def delete( # Increment our delete count delete_count += int(res.delete_count) # type: ignore except Exception as e: - logger.exception("Failed to delete by filter, error: {}".format(e)) + logger.error("Failed to delete by filter, error: {}".format(e)) logger.info("{:d} records deleted".format(delete_count)) diff --git a/datastore/providers/pgvector_datastore.py b/datastore/providers/pgvector_datastore.py index be4c93455..cd7026b23 100644 --- a/datastore/providers/pgvector_datastore.py +++ b/datastore/providers/pgvector_datastore.py @@ -148,7 +148,7 @@ async def _query(self, queries: List[QueryWithEmbedding]) -> List[QueryResult]: results.append(document_chunk) query_results.append(QueryResult(query=query.query, results=results)) except Exception as e: - logger.exception("error:", e) + logger.error(e) query_results.append(QueryResult(query=query.query, results=[])) return query_results diff --git a/datastore/providers/pinecone_datastore.py b/datastore/providers/pinecone_datastore.py index 870f02ec7..c10ee2bea 100644 --- a/datastore/providers/pinecone_datastore.py +++ b/datastore/providers/pinecone_datastore.py @@ -53,7 +53,7 @@ def __init__(self): self.index = pinecone.Index(PINECONE_INDEX) logger.info(f"Index {PINECONE_INDEX} created successfully") except Exception as e: - logger.exception(f"Error creating index {PINECONE_INDEX}: {e}") + logger.error(f"Error creating index {PINECONE_INDEX}: {e}") raise e elif PINECONE_INDEX and PINECONE_INDEX in pinecone.list_indexes(): # Connect to an existing index with the specified name @@ -62,7 +62,7 @@ def __init__(self): self.index = pinecone.Index(PINECONE_INDEX) logger.info(f"Connected to index {PINECONE_INDEX} successfully") except Exception as e: - logger.exception(f"Error connecting to index {PINECONE_INDEX}: {e}") + logger.error(f"Error connecting to index {PINECONE_INDEX}: {e}") raise e @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) @@ -102,7 +102,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: self.index.upsert(vectors=batch) logger.info(f"Upserted batch successfully") except Exception as e: - logger.exception(f"Error upserting batch: {e}") + logger.error(f"Error upserting batch: {e}") raise e return doc_ids @@ -133,7 +133,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult: include_metadata=True, ) except Exception as e: - logger.exception(f"Error querying index: {e}") + logger.error(f"Error querying index: {e}") raise e query_results: List[DocumentChunkWithScore] = [] @@ -190,7 +190,7 @@ async def delete( logger.info(f"Deleted all vectors successfully") return True except Exception as e: - logger.exception(f"Error deleting all vectors: {e}") + logger.error(f"Error deleting all vectors: {e}") raise e # Convert the metadata filter object to a dict with pinecone filter expressions @@ -202,7 +202,7 @@ async def delete( self.index.delete(filter=pinecone_filter) logger.info(f"Deleted vectors with filter successfully") except Exception as e: - logger.exception(f"Error deleting vectors with filter: {e}") + logger.error(f"Error deleting vectors with filter: {e}") raise e # Delete vectors that match the document ids from the index if the ids list is not empty @@ -213,7 +213,7 @@ async def delete( self.index.delete(filter=pinecone_filter) # type: ignore logger.info(f"Deleted vectors with ids successfully") except Exception as e: - logger.exception(f"Error deleting vectors with ids: {e}") + logger.error(f"Error deleting vectors with ids: {e}") raise e return True diff --git a/datastore/providers/redis_datastore.py b/datastore/providers/redis_datastore.py index 9b82b052c..da13348f7 100644 --- a/datastore/providers/redis_datastore.py +++ b/datastore/providers/redis_datastore.py @@ -62,7 +62,7 @@ async def _check_redis_module_exist(client: redis.Redis, modules: List[dict]): if module["name"] not in installed_modules or int(installed_modules[module["name"]]["ver"]) < int(module["ver"]): error_message = "You must add the RediSearch (>= 2.6) and ReJSON (>= 2.4) modules from Redis Stack. " \ "Please refer to Redis Stack docs: https://redis.io/docs/stack/" - logger.exception(error_message) + logger.error(error_message) raise AttributeError(error_message) @@ -89,7 +89,7 @@ async def init(cls, **kwargs): host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD ) except Exception as e: - logger.exception(f"Error setting up Redis: {e}") + logger.error(f"Error setting up Redis: {e}") raise e await _check_redis_module_exist(client, modules=REDIS_REQUIRED_MODULES) @@ -353,7 +353,7 @@ async def delete( logger.info(f"Deleted all documents successfully") return True except Exception as e: - logger.exception(f"Error deleting all documents: {e}") + logger.error(f"Error deleting all documents: {e}") raise e # Delete by filter @@ -367,7 +367,7 @@ async def delete( await self._redis_delete(keys) logger.info(f"Deleted document {filter.document_id} successfully") except Exception as e: - logger.exception(f"Error deleting document {filter.document_id}: {e}") + logger.error(f"Error deleting document {filter.document_id}: {e}") raise e # Delete by explicit ids (Redis keys) @@ -385,7 +385,7 @@ async def delete( logger.info(f"Deleting {len(keys)} keys from Redis") await self._redis_delete(keys) except Exception as e: - logger.exception(f"Error deleting ids: {e}") + logger.error(f"Error deleting ids: {e}") raise e return True diff --git a/datastore/providers/weaviate_datastore.py b/datastore/providers/weaviate_datastore.py index 4baae392a..fe3ae3b56 100644 --- a/datastore/providers/weaviate_datastore.py +++ b/datastore/providers/weaviate_datastore.py @@ -97,7 +97,7 @@ def handle_errors(self, results: Optional[List[dict]]) -> List[str]: continue for message in result["result"]["errors"]["error"]: error_messages.append(message["message"]) - logger.exception(message["message"]) + logger.error(message["message"]) return error_messages diff --git a/datastore/providers/zilliz_datastore.py b/datastore/providers/zilliz_datastore.py index ef7359951..81f151c43 100644 --- a/datastore/providers/zilliz_datastore.py +++ b/datastore/providers/zilliz_datastore.py @@ -60,6 +60,6 @@ def _create_index(self): self.col.load() self.search_params = {"metric_type": "IP", "params": {}} except Exception as e: - logger.exception("Failed to create index, error: {}".format(e)) + logger.error("Failed to create index, error: {}".format(e)) diff --git a/examples/authentication-methods/no-auth/main.py b/examples/authentication-methods/no-auth/main.py index bfd02f959..961c725c1 100644 --- a/examples/authentication-methods/no-auth/main.py +++ b/examples/authentication-methods/no-auth/main.py @@ -56,7 +56,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -71,7 +71,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -88,7 +88,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -106,7 +106,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -130,7 +130,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/examples/memory/main.py b/examples/memory/main.py index 66c5f773c..c94d3f94d 100644 --- a/examples/memory/main.py +++ b/examples/memory/main.py @@ -72,7 +72,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -88,7 +88,7 @@ async def upsert_main( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -106,7 +106,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -124,7 +124,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -144,7 +144,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -169,7 +169,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/local_server/main.py b/local_server/main.py index 286fa09cf..81506fd2b 100644 --- a/local_server/main.py +++ b/local_server/main.py @@ -83,7 +83,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -98,7 +98,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -110,7 +110,7 @@ async def query_main(request: QueryRequest = Body(...)): ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -134,7 +134,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/scripts/process_json/process_json.py b/scripts/process_json/process_json.py index 5f29269cb..8b9624cbb 100644 --- a/scripts/process_json/process_json.py +++ b/scripts/process_json/process_json.py @@ -88,7 +88,7 @@ async def process_json_dump( documents.append(document) except Exception as e: # log the error and continue with the next item - logger.exception(f"Error processing {item}: {e}") + logger.error(f"Error processing {item}: {e}") skipped_items.append(item) # add the skipped item to the list # do this in batches, the upsert method already batches documents but this allows diff --git a/scripts/process_jsonl/process_jsonl.py b/scripts/process_jsonl/process_jsonl.py index f3db7869d..463871b96 100644 --- a/scripts/process_jsonl/process_jsonl.py +++ b/scripts/process_jsonl/process_jsonl.py @@ -87,7 +87,7 @@ async def process_jsonl_dump( documents.append(document) except Exception as e: # log the error and continue with the next item - logger.exception(f"Error processing {item}: {e}") + logger.error(f"Error processing {item}: {e}") skipped_items.append(item) # add the skipped item to the list # do this in batches, the upsert method already batches documents but this allows diff --git a/scripts/process_zip/process_zip.py b/scripts/process_zip/process_zip.py index 5b4be1ee8..7865c85b5 100644 --- a/scripts/process_zip/process_zip.py +++ b/scripts/process_zip/process_zip.py @@ -81,7 +81,7 @@ async def process_file_dump( documents.append(document) except Exception as e: # log the error and continue with the next file - logger.exception(f"Error processing {filepath}: {e}") + logger.error(f"Error processing {filepath}: {e}") skipped_files.append(filepath) # add the skipped file to the list # do this in batches, the upsert method already batches documents but this allows diff --git a/server/main.py b/server/main.py index d8e9120a7..dc3377a1b 100644 --- a/server/main.py +++ b/server/main.py @@ -67,7 +67,7 @@ async def upsert_file( ids = await datastore.upsert([document]) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail=f"str({e})") @@ -82,7 +82,7 @@ async def upsert( ids = await datastore.upsert(request.documents) return UpsertResponse(ids=ids) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -99,7 +99,7 @@ async def query_main( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -118,7 +118,7 @@ async def query( ) return QueryResponse(results=results) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") @@ -142,7 +142,7 @@ async def delete( ) return DeleteResponse(success=success) except Exception as e: - logger.exception("Error:", e) + logger.error(e) raise HTTPException(status_code=500, detail="Internal Service Error") diff --git a/services/file.py b/services/file.py index f0218d07f..136fc17c5 100644 --- a/services/file.py +++ b/services/file.py @@ -39,7 +39,7 @@ def extract_text_from_filepath(filepath: str, mimetype: Optional[str] = None) -> with open(filepath, "rb") as file: extracted_text = extract_text_from_file(file, mimetype) except Exception as e: - logger.exception(f"Error: {e}") + logger.error(e) raise e return extracted_text @@ -107,7 +107,7 @@ async def extract_text_from_form_file(file: UploadFile): try: extracted_text = extract_text_from_filepath(temp_file_path, mimetype) except Exception as e: - logger.exception(f"Error: {e}") + logger.error(e) os.remove(temp_file_path) raise e