Skip to content

Add loguru for logging #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datastore/providers/analyticdb_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
from loguru import logger

from psycopg2cffi import compat

Expand Down Expand Up @@ -252,7 +253,7 @@ def create_results(data):
QueryResult(query=query.query, results=results)
)
except Exception as e:
print("error:", e)
logger.exception("error:", e)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't logger.exception already logs the stack trace along with the error message?
So: logger.exception("<descriptive message>") should be sufficient, this applies to all cases in the PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed new changes, replaced exception with error. Exception is verbose stack trace and it is logging PII. Yes this is tested.

query_results.append(QueryResult(query=query.query, results=[]))
return query_results
finally:
Expand All @@ -275,7 +276,7 @@ async def execute_delete(query: str, params: Optional[List] = None) -> bool:
self.conn.commit()
return True
except Exception as e:
print(f"Error: {e}")
logger.exception(f"Error: {e}")
return False
finally:
self.connection_pool.putconn(conn)
Expand Down
59 changes: 26 additions & 33 deletions datastore/providers/milvus_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import asyncio

from loguru import logger
from typing import Dict, List, Optional
from pymilvus import (
Collection,
Expand Down Expand Up @@ -124,14 +125,6 @@ def __init__(
self._create_collection(MILVUS_COLLECTION, create_new) # type: ignore
self._create_index()

def _print_info(self, msg):
# TODO: logger
print(msg)

def _print_err(self, msg):
# TODO: logger
print(msg)

def _get_schema(self):
return SCHEMA_V1 if self._schema_ver == "V1" else SCHEMA_V2

Expand All @@ -143,7 +136,7 @@ def _create_connection(self):
addr = connections.get_connection_addr(x[0])
if x[1] and ('address' in addr) and (addr['address'] == "{}:{}".format(MILVUS_HOST, MILVUS_PORT)):
self.alias = x[0]
self._print_info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'"
logger.info("Reuse connection to Milvus server '{}:{}' with alias '{:s}'"
.format(MILVUS_HOST, MILVUS_PORT, self.alias))
break

Expand All @@ -158,10 +151,10 @@ def _create_connection(self):
password=MILVUS_PASSWORD, # type: ignore
secure=MILVUS_USE_SECURITY,
)
self._print_info("Create connection to Milvus server '{}:{}' with alias '{:s}'"
logger.info("Create connection to Milvus server '{}:{}' with alias '{:s}'"
.format(MILVUS_HOST, MILVUS_PORT, self.alias))
except Exception as e:
self._print_err("Failed to create connection to Milvus server '{}:{}', error: {}"
logger.exception("Failed to create connection to Milvus server '{}:{}', error: {}"
.format(MILVUS_HOST, MILVUS_PORT, e))

def _create_collection(self, collection_name, create_new: bool) -> None:
Expand Down Expand Up @@ -189,7 +182,7 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
consistency_level=self._consistency_level,
)
self._schema_ver = "V2"
self._print_info("Create Milvus collection '{}' with schema {} and consistency level {}"
logger.info("Create Milvus collection '{}' with schema {} and consistency level {}"
.format(collection_name, self._schema_ver, self._consistency_level))
else:
# If the collection exists, point to it
Expand All @@ -201,10 +194,10 @@ def _create_collection(self, collection_name, create_new: bool) -> None:
if field.name == "id" and field.is_primary:
self._schema_ver = "V2"
break
self._print_info("Milvus collection '{}' already exists with schema {}"
logger.info("Milvus collection '{}' already exists with schema {}"
.format(collection_name, self._schema_ver))
except Exception as e:
self._print_err("Failed to create collection '{}', error: {}".format(collection_name, e))
logger.exception("Failed to create collection '{}', error: {}".format(collection_name, e))

def _create_index(self):
# TODO: verify index/search params passed by os.environ
Expand All @@ -216,7 +209,7 @@ def _create_index(self):
if self.index_params is not None:
# Convert the string format to JSON format parameters passed by MILVUS_INDEX_PARAMS
self.index_params = json.loads(self.index_params)
self._print_info("Create Milvus index: {}".format(self.index_params))
logger.info("Create Milvus index: {}".format(self.index_params))
# Create an index on the 'embedding' field with the index params found in init
self.col.create_index(EMBEDDING_FIELD, index_params=self.index_params)
else:
Expand All @@ -227,24 +220,24 @@ def _create_index(self):
"index_type": "HNSW",
"params": {"M": 8, "efConstruction": 64},
}
self._print_info("Attempting creation of Milvus '{}' index".format(i_p["index_type"]))
logger.info("Attempting creation of Milvus '{}' index".format(i_p["index_type"]))
self.col.create_index(EMBEDDING_FIELD, index_params=i_p)
self.index_params = i_p
self._print_info("Creation of Milvus '{}' index successful".format(i_p["index_type"]))
logger.info("Creation of Milvus '{}' index successful".format(i_p["index_type"]))
# If create fails, most likely due to being Zilliz Cloud instance, try to create an AutoIndex
except MilvusException:
self._print_info("Attempting creation of Milvus default index")
logger.info("Attempting creation of Milvus default index")
i_p = {"metric_type": "IP", "index_type": "AUTOINDEX", "params": {}}
self.col.create_index(EMBEDDING_FIELD, index_params=i_p)
self.index_params = i_p
self._print_info("Creation of Milvus default index successful")
logger.info("Creation of Milvus default index successful")
# If an index already exists, grab its params
else:
# How about if the first index is not vector index?
for index in self.col.indexes:
idx = index.to_dict()
if idx["field"] == EMBEDDING_FIELD:
self._print_info("Index already exists: {}".format(idx))
logger.info("Index already exists: {}".format(idx))
self.index_params = idx['index_param']
break

Expand Down Expand Up @@ -272,9 +265,9 @@ def _create_index(self):
}
# Set the search params
self.search_params = default_search_params[self.index_params["index_type"]]
self._print_info("Milvus search parameters: {}".format(self.search_params))
logger.info("Milvus search parameters: {}".format(self.search_params))
except Exception as e:
self._print_err("Failed to create index, error: {}".format(e))
logger.exception("Failed to create index, error: {}".format(e))

async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
"""Upsert chunks into the datastore.
Expand Down Expand Up @@ -319,18 +312,18 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
for batch in batches:
if len(batch[0]) != 0:
try:
self._print_info(f"Upserting batch of size {len(batch[0])}")
logger.info(f"Upserting batch of size {len(batch[0])}")
self.col.insert(batch)
self._print_info(f"Upserted batch successfully")
logger.info(f"Upserted batch successfully")
except Exception as e:
self._print_err(f"Failed to insert batch records, error: {e}")
logger.exception(f"Failed to insert batch records, error: {e}")
raise e

# This setting perfoms flushes after insert. Small insert == bad to use
# self.col.flush()
return doc_ids
except Exception as e:
self._print_err("Failed to insert records, error: {}".format(e))
logger.exception("Failed to insert records, error: {}".format(e))
return []


Expand Down Expand Up @@ -365,7 +358,7 @@ def _get_values(self, chunk: DocumentChunk) -> List[any] | None: # type: ignore
x = values.get(key) or default
# If one of our required fields is missing, ignore the entire entry
if x is Required:
self._print_info("Chunk " + values["id"] + " missing " + key + " skipping")
logger.info("Chunk " + values["id"] + " missing " + key + " skipping")
return None
# Add the corresponding value if it passes the tests
ret.append(x)
Expand Down Expand Up @@ -436,7 +429,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult:

return QueryResult(query=query.query, results=results)
except Exception as e:
self._print_err("Failed to query, error: {}".format(e))
logger.exception("Failed to query, error: {}".format(e))
return QueryResult(query=query.query, results=[])

results: List[QueryResult] = await asyncio.gather(
Expand All @@ -460,7 +453,7 @@ async def delete(
# If deleting all, drop and create the new collection
if delete_all:
coll_name = self.col.name
self._print_info("Delete the entire collection {} and create new one".format(coll_name))
logger.info("Delete the entire collection {} and create new one".format(coll_name))
# Release the collection from memory
self.col.release()
# Drop the collection
Expand Down Expand Up @@ -490,7 +483,7 @@ async def delete(
pks = ['"' + pk + '"' for pk in pks]

# Delete by ids batch by batch(avoid too long expression)
self._print_info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver))
logger.info("Apply {:d} deletions to schema {:s}".format(len(pks), self._schema_ver))
while len(pks) > 0:
batch_pks = pks[:batch_size]
pks = pks[batch_size:]
Expand All @@ -499,7 +492,7 @@ async def delete(
# Increment our deleted count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
self._print_err("Failed to delete by ids, error: {}".format(e))
logger.exception("Failed to delete by ids, error: {}".format(e))

try:
# Check if empty filter
Expand All @@ -524,9 +517,9 @@ async def delete(
# Increment our delete count
delete_count += int(res.delete_count) # type: ignore
except Exception as e:
self._print_err("Failed to delete by filter, error: {}".format(e))
logger.exception("Failed to delete by filter, error: {}".format(e))

self._print_info("{:d} records deleted".format(delete_count))
logger.info("{:d} records deleted".format(delete_count))

# This setting performs flushes after delete. Small delete == bad to use
# self.col.flush()
Expand Down
3 changes: 2 additions & 1 deletion datastore/providers/pgvector_datastore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from datetime import datetime
from loguru import logger

from services.date import to_unix_timestamp
from datastore.datastore import DataStore
Expand Down Expand Up @@ -147,7 +148,7 @@ async def _query(self, queries: List[QueryWithEmbedding]) -> List[QueryResult]:
results.append(document_chunk)
query_results.append(QueryResult(query=query.query, results=results))
except Exception as e:
print("error:", e)
logger.exception("error:", e)
query_results.append(QueryResult(query=query.query, results=[]))
return query_results

Expand Down
43 changes: 22 additions & 21 deletions datastore/providers/pinecone_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pinecone
from tenacity import retry, wait_random_exponential, stop_after_attempt
import asyncio
from loguru import logger

from datastore.datastore import DataStore
from models.models import (
Expand Down Expand Up @@ -41,7 +42,7 @@ def __init__(self):

# Create a new index with the specified name, dimension, and metadata configuration
try:
print(
logger.info(
f"Creating index {PINECONE_INDEX} with metadata config {fields_to_index}"
)
pinecone.create_index(
Expand All @@ -50,18 +51,18 @@ def __init__(self):
metadata_config={"indexed": fields_to_index},
)
self.index = pinecone.Index(PINECONE_INDEX)
print(f"Index {PINECONE_INDEX} created successfully")
logger.info(f"Index {PINECONE_INDEX} created successfully")
except Exception as e:
print(f"Error creating index {PINECONE_INDEX}: {e}")
logger.exception(f"Error creating index {PINECONE_INDEX}: {e}")
raise e
elif PINECONE_INDEX and PINECONE_INDEX in pinecone.list_indexes():
# Connect to an existing index with the specified name
try:
print(f"Connecting to existing index {PINECONE_INDEX}")
logger.info(f"Connecting to existing index {PINECONE_INDEX}")
self.index = pinecone.Index(PINECONE_INDEX)
print(f"Connected to index {PINECONE_INDEX} successfully")
logger.info(f"Connected to index {PINECONE_INDEX} successfully")
except Exception as e:
print(f"Error connecting to index {PINECONE_INDEX}: {e}")
logger.exception(f"Error connecting to index {PINECONE_INDEX}: {e}")
raise e

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
Expand All @@ -78,7 +79,7 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
for doc_id, chunk_list in chunks.items():
# Append the id to the ids list
doc_ids.append(doc_id)
print(f"Upserting document_id: {doc_id}")
logger.info(f"Upserting document_id: {doc_id}")
for chunk in chunk_list:
# Create a vector tuple of (id, embedding, metadata)
# Convert the metadata object to a dict with unix timestamps for dates
Expand All @@ -97,11 +98,11 @@ async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
# Upsert each batch to Pinecone
for batch in batches:
try:
print(f"Upserting batch of size {len(batch)}")
logger.info(f"Upserting batch of size {len(batch)}")
self.index.upsert(vectors=batch)
print(f"Upserted batch successfully")
logger.info(f"Upserted batch successfully")
except Exception as e:
print(f"Error upserting batch: {e}")
logger.exception(f"Error upserting batch: {e}")
raise e

return doc_ids
Expand All @@ -117,7 +118,7 @@ async def _query(

# Define a helper coroutine that performs a single query and returns a QueryResult
async def _single_query(query: QueryWithEmbedding) -> QueryResult:
print(f"Query: {query.query}")
logger.debug(f"Query: {query.query}")

# Convert the metadata filter object to a dict with pinecone filter expressions
pinecone_filter = self._get_pinecone_filter(query.filter)
Expand All @@ -132,7 +133,7 @@ async def _single_query(query: QueryWithEmbedding) -> QueryResult:
include_metadata=True,
)
except Exception as e:
print(f"Error querying index: {e}")
logger.exception(f"Error querying index: {e}")
raise e

query_results: List[DocumentChunkWithScore] = []
Expand Down Expand Up @@ -184,35 +185,35 @@ async def delete(
# Delete all vectors from the index if delete_all is True
if delete_all:
try:
print(f"Deleting all vectors from index")
logger.info(f"Deleting all vectors from index")
self.index.delete(delete_all=True)
print(f"Deleted all vectors successfully")
logger.info(f"Deleted all vectors successfully")
return True
except Exception as e:
print(f"Error deleting all vectors: {e}")
logger.exception(f"Error deleting all vectors: {e}")
raise e

# Convert the metadata filter object to a dict with pinecone filter expressions
pinecone_filter = self._get_pinecone_filter(filter)
# Delete vectors that match the filter from the index if the filter is not empty
if pinecone_filter != {}:
try:
print(f"Deleting vectors with filter {pinecone_filter}")
logger.info(f"Deleting vectors with filter {pinecone_filter}")
self.index.delete(filter=pinecone_filter)
print(f"Deleted vectors with filter successfully")
logger.info(f"Deleted vectors with filter successfully")
except Exception as e:
print(f"Error deleting vectors with filter: {e}")
logger.exception(f"Error deleting vectors with filter: {e}")
raise e

# Delete vectors that match the document ids from the index if the ids list is not empty
if ids is not None and len(ids) > 0:
try:
print(f"Deleting vectors with ids {ids}")
logger.info(f"Deleting vectors with ids {ids}")
pinecone_filter = {"document_id": {"$in": ids}}
self.index.delete(filter=pinecone_filter) # type: ignore
print(f"Deleted vectors with ids successfully")
logger.info(f"Deleted vectors with ids successfully")
except Exception as e:
print(f"Error deleting vectors with ids: {e}")
logger.exception(f"Error deleting vectors with ids: {e}")
raise e

return True
Expand Down
Loading