from serenitas.utils.remote import SftpClient from dataclasses import field, dataclass from typing import Callable import pandas as pd from serenitas.utils.db import dawn_engine, dbconn import datetime import re from serenitas.analytics.dates import prev_business_day, next_business_day def citco_accrued(s): if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s): dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S") return prev_business_day(dt) def citco_all(s): if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s): dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") return dt def load_citco_report(df, kd): df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = kd.date() df["knowledge_date"] = next_business_day(kd) for column in [ "init_date", "init_settle_date", "liqd_date", "liqd_settle_date", "maturity_date", "orig_date", "start_date", "end_date", ]: if column in df.columns: df[column] = pd.to_datetime(df[column], infer_datetime_format=True).dt.date return df @dataclass class Report: table: str ped_func: Callable[[str], datetime.datetime] fname: str _sftp: SftpClient date: datetime.date _conn: dbconn = dbconn("dawndb") @classmethod def set_report(cls, report, date): fund, report = report.split("_") if (fund, report) == ("isosel", "accrued"): return cls( "isosel_accrued", citco_accrued, "100502500_INNOCAP_ISOSEL", SftpClient.from_creds("citco", folder="outgoing"), date, ) elif (fund, report) == ("isosel", "all"): return cls( "citco_reports", citco_all, "SPOS4X_INNOCAP_ISOSEL_D_IM", SftpClient.from_creds("citco", folder="outgoing"), date, ) @property def most_recent_report(self): report_files = [ filename for filename in self._sftp.client.listdir() if self.fname in filename if self.ped_func(filename).date() == self.date ] return max(report_files, key=self.ped_func) def to_df(self): with self._sftp.client.open(self.most_recent_report) as fh: df = pd.read_csv(fh) return load_citco_report(df, self.ped_func(self.most_recent_report)) def to_db(self): with self._conn.cursor() as c: c.execute( f"DELETE FROM {self.table} WHERE period_end_date= %s", (self.date,), ) self._conn.commit() df = self.to_df() df.to_sql(self.table, dawn_engine, if_exists="append", index=False)