import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.utils.db import dawn_engine, dbconn from serenitas.utils.remote import SftpClient import datetime from serenitas.analytics.dates import prev_business_day, next_business_day import re def get_ped(s): if m := re.search("ISOSEL.([\d.]+)\.", s): dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") return dt.date(), dt def load_citco_report(fdir, ped, kd): df = pd.read_csv(fdir) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = ped df["knowledge_date"] = kd for column in [ "open_trade_date", "open_settle_date", "close_trade_date", "close_settle_date", "maturity_date", "orig_date", ]: df[column] = pd.to_datetime(df[column], format="%Y%m%d") return df def download_reports(cob): reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports" reports_dir.mkdir(exist_ok=True, parents=True) sftp = SftpClient.from_creds("citco") citco_files = [ filename for filename in sftp.client.listdir("/outgoing") if "STAX_INNOCAP" in filename if get_ped(filename)[0] == cob ] if not citco_files: return f = max(citco_files, key=get_ped) sftp.client.get( f"/outgoing/{f}", localpath=reports_dir / f"Valuation_Report_bytrade_ISOSEL.csv" ) return get_ped(f) def load_reports(cob, ped, kd): reports_dir = DAILY_DIR / str(next_business_day(cob)) / "Reports" dest = reports_dir / "Valuation_Report_bytrade_ISOSEL.csv" if not dest.exists(): # Early exit, file not there return df = load_citco_report(dest, ped, kd) conn = dbconn("dawndb") with conn.cursor() as c: c.execute( "DELETE from citco_stax where period_end_date =%s", (cob,), ) conn.commit() df.to_sql("citco_stax", dawn_engine, if_exists="append", index=False) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "cob", nargs="?", type=datetime.date.fromisoformat, default=prev_business_day(datetime.date.today()), help="close of business", ) args = parser.parse_args() ped, kd = download_reports(args.cob) load_reports(args.cob, ped, kd)