import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.utils.db import dawn_engine, dbconn from serenitas.utils.remote import SftpClient import datetime from serenitas.analytics.dates import prev_business_day, next_business_day import re def get_ped(s): if m := re.search("IM.([\d.]+)\.", s): dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") return dt.date(), dt.time() def load_citco_report(fdir): df = pd.read_csv(fdir) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = pd.to_datetime(df["period_end_date"], format="%Y%m%d") df["knowledge_date"] = pd.to_datetime(df["period_end_date"], format="%Y%m%d") return df def download_reports(cob): reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports" reports_dir.mkdir(exist_ok=True, parents=True) sftp = SftpClient.from_creds("citco") citco_files = [ filename for filename in sftp.client.listdir("/outgoing") if "SPOS4X_INNOCAP" in filename if get_ped(filename)[0] == cob ] if not citco_files: return f = max(citco_files, key=get_ped) sftp.client.get( f"/outgoing/{f}", localpath=reports_dir / f"Valuation_Report_ISOSEL.csv" ) def load_reports(cob): reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports" dest = reports_dir / "Valuation_Report_ISOSEL.csv" if not dest.exists(): # Early exit, file not there return df = load_citco_report(dest) conn = dbconn("dawndb") with conn.cursor() as c: c.execute( "DELETE from citco_reports where period_end_date =%s", (cob,), ) conn.commit() df.to_sql("citco_reports", dawn_engine, if_exists="append", index=False) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "cob", nargs="?", type=datetime.date.fromisoformat, default=prev_business_day(datetime.date.today()), help="close of business", ) args = parser.parse_args() download_reports(args.cob) load_reports(args.cob)