from functools import partial from typing import ClassVar import datetime from dataclasses import dataclass import pandas as pd import dateutil.parser as dparser from serenitas.utils.remote import SftpClient from serenitas.ops.trade_dataclasses import Fund from serenitas.utils.db import dawn_engine, dbconn from serenitas.analytics.dates import next_business_day from serenitas.analytics.exceptions import MissingDataError from .misc import dt_from_citco @dataclass class CitcoReport: report_name: ClassVar table: ClassVar knowledge_date: datetime.date = None _conn: dbconn = dbconn("dawndb") _sftp: ClassVar = SftpClient.from_creds("citco", folder="outgoing") date_cols: ClassVar = [] dtkey: ClassVar _registry = {} def __init_subclass__(cls, table, date_cols, dtkey): cls.table = table cls.date_cols = date_cols cls.dtkey = dtkey cls._registry[table] = cls def __class_getitem__(cls, table): return cls._registry[table] def __post_init__(self): if not self.report_file_name: raise MissingDataError( f"No reports for {self.fund}:{self.table} on {self.date}" ) self.knowledge_date = datetime.datetime.combine( next_business_day(self.date), self.dtkey_fun(self.report_file_name).time() ) @staticmethod def get_report_date(cob): return cob @classmethod def get_report(cls, date, fund): prefix = cls.get_prefix(fund) dtkey_fun = cls.get_dt_fun(fund) report_date = cls.get_report_date(date) return max( [ f for f in cls._sftp.client.listdir() if (prefix in f) and (dtkey_fun(f).date() == report_date) ], key=dtkey_fun, ) @classmethod def get_df(cls, report_name): with cls._sftp.client.open(report_name) as fh: df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = cls.date df["knowledge_date"] = cls.knowledge_date df["fund"] = cls.fund return df @classmethod def to_db(cls, date, fund): report_name = cls.get_report(date, fund) df = cls.get_df(report_name) with cls._conn.cursor() as c: c.execute( f"DELETE FROM {cls.table} WHERE period_end_date=%s AND fund=%s", ( date, fund, ), ) cls._conn.commit() if "strategy" in df.columns: df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True) df.to_sql(self.table, dawn_engine, if_exists="append", index=False) @classmethod def get_dt_fun(cls, fund): def dt_fun(fname, date): ts = fname.removeprefix(cls.get_prefix(fund)).removesuffix(".csv") return dparser(ts) return dt_fun class AccruedReport( CitcoReport, table="isosel_accrued", date_cols=[ "Init Date", "Init Settle Date", "Liqd Date", "Liqd Settle Date", "Bond Maturity", "Orig Date", "Start Date", "End Date", ], dtkey="%Y%m%d%H%M%S", ): @staticmethod def get_report_date(cob): return next_business_day(cob) @staticmethod def get_prefix(fund): return f"100502500_INNOCAP_{fund}_" class AllReport( CitcoReport, table="citco_reports", date_cols=["Maturity Date"], dtkey="%Y%m%d.%H%M%S", ): @staticmethod def get_prefix(fund): return f"SPOS4X_INNOCAP_{fund}_D_{'IM' if fund =='ISOSEL' else 'IMCG'}."