|
| 1 | +from functools import partial |
| 2 | +from pathlib import Path |
| 3 | +from typing import Union, Type |
| 4 | + |
| 5 | +import pandas as pd |
| 6 | +from sqlalchemy.future import Engine |
| 7 | +from sqlalchemy.exc import NoResultFound |
| 8 | +from sqlmodel import SQLModel, Session, create_engine, select |
| 9 | + |
| 10 | +from .models import Clone, Referring, Traffic, Paths |
| 11 | +from .logger import app_log as log |
| 12 | + |
| 13 | +SQLITE_FILE_NAME = "data/sqlite3.db" |
| 14 | + |
| 15 | + |
| 16 | +def configure(test: bool = False, echo: bool = False) -> Engine: |
| 17 | + sqlite_file_name = "test_data/sqlite3.db" if test else SQLITE_FILE_NAME |
| 18 | + sqlite_url = f"sqlite:///{sqlite_file_name}" |
| 19 | + return create_engine(sqlite_url, echo=echo) |
| 20 | + |
| 21 | + |
| 22 | +def create_db_and_tables(test: bool = False): |
| 23 | + engine = configure(test=test, echo=True) |
| 24 | + SQLModel.metadata.create_all(engine) |
| 25 | + return engine |
| 26 | + |
| 27 | + |
| 28 | +def migrate_csv( |
| 29 | + filename: Union[str, Path], |
| 30 | + model: Type[SQLModel], |
| 31 | + engine: Engine, |
| 32 | + skip_rows: Union[int, None] = None, |
| 33 | +): |
| 34 | + """Migrate CSV over to SQLite""" |
| 35 | + |
| 36 | + names = list( |
| 37 | + map( |
| 38 | + lambda f: f.name, |
| 39 | + filter(lambda x: x.required, model.__fields__.values()), |
| 40 | + ) |
| 41 | + ) |
| 42 | + log.info(f"[yellow]Loading: {filename}") |
| 43 | + df = pd.read_csv(filename, header=None, skiprows=skip_rows, names=names) |
| 44 | + func = partial(query, engine=engine, model=model) |
| 45 | + |
| 46 | + query_results = list(map(func, df["repository_name"], df["date"])) |
| 47 | + new_df: pd.DataFrame = df.iloc[ |
| 48 | + [idx for idx, item in enumerate(query_results) if not item] |
| 49 | + ] |
| 50 | + if new_df.empty: |
| 51 | + log.info("No new records!") |
| 52 | + else: |
| 53 | + log.info(f"New records found: {len(new_df)}") |
| 54 | + log.info("[bold yellow]Adding data") |
| 55 | + new_df.to_sql( |
| 56 | + model.__name__.lower(), engine, if_exists="append", index=False |
| 57 | + ) |
| 58 | + |
| 59 | + if len(new_df) < len(df): |
| 60 | + log.info("[orange]Some records exists in db") |
| 61 | + |
| 62 | + |
| 63 | +def query( |
| 64 | + repository_name: str, |
| 65 | + date: str, |
| 66 | + engine: Engine, |
| 67 | + model: Union[SQLModel, Clone, Referring, Paths, Traffic], |
| 68 | +) -> Union[SQLModel, Clone, Referring, Paths, Traffic, None]: |
| 69 | + |
| 70 | + with Session(engine) as session: |
| 71 | + result = session.exec( |
| 72 | + select(model).where( |
| 73 | + model.repository_name == repository_name, model.date == date |
| 74 | + ) |
| 75 | + ) |
| 76 | + try: |
| 77 | + return result.one() |
| 78 | + except NoResultFound: |
| 79 | + return |
0 commit comments