from typing import ClassVar import datetime from dataclasses import dataclass import re import pandas as pd from serenitas.utils.remote import SftpClient from serenitas.utils.db import dawn_engine, dbconn from serenitas.analytics.dates import next_business_day, prev_business_day from serenitas.analytics.exceptions import MissingDataError _suffix = {"ISOSEL": "IM", "CRSE": "IMCG"} @dataclass class CitcoReport: report_name: ClassVar table: ClassVar knowledge_date: datetime.date = None _conn: dbconn = dbconn("dawndb") _sftp: ClassVar = SftpClient.from_creds("citco", folder="outgoing") date_cols: ClassVar = [] prefix: ClassVar _registry = {} def __init_subclass__(cls, table, date_cols, prefix): cls.table = table cls.date_cols = date_cols cls.prefix = prefix cls._registry[table] = cls def __class_getitem__(cls, table): return cls._registry[table] @staticmethod def get_report_date(cob): return cob @classmethod def get_report(cls, date, fund): reports = [ f for f in cls._sftp.client.listdir() if (cls.prefix.format(fund=fund, suffix=_suffix[fund]) in f) and cls.get_ts(f).date() == date ] return max( reports, key=cls.get_ts, default=None, ) @classmethod def get_df(cls, report_name): with cls._sftp.client.open(report_name) as fh: ts = cls.get_ts(report_name) df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = ts.date() df["knowledge_date"] = next_business_day(ts) if "strategy" in df.columns: df["strategy"] = df["strategy"].str.replace( "/M_|/SER_", "/", regex=True ) return df @classmethod def to_db(cls, date, fund): if report_name := cls.get_report(date, fund): conn = cls._conn df = cls.get_df(report_name) df["fund"] = fund with conn.cursor() as c: c.execute( f"DELETE FROM {cls.table} WHERE period_end_date=%s AND fund=%s", ( date, fund, ), ) conn.commit() df.to_sql(cls.table, dawn_engine, if_exists="append", index=False) else: raise MissingDataError(f"{fund}: {cls.table} report not ready for {date}") class AccruedReport( CitcoReport, table="isosel_accrued", date_cols=[ "Init Date", "Init Settle Date", "Liqd Date", "Liqd Settle Date", "Bond Maturity", "Orig Date", "Start Date", "End Date", ], prefix="100502500_INNOCAP_{fund}_", ): @staticmethod def get_ts(s): m = re.search(r"\d{8}\d{4}", s) return prev_business_day(datetime.datetime.strptime(m.group(), "%Y%m%d%H%M%S")) class AllReport( CitcoReport, table="citco_reports", date_cols=["Maturity Date"], prefix="SPOS4X_INNOCAP_{fund}_D_{suffix}", ): @staticmethod def get_ts(s): m = re.search(r"\d{8}\.\d{6}", s) return datetime.datetime.strptime(m.group(), "%Y%m%d.%H%M%S")