from functools import partial from typing import ClassVar import datetime from dataclasses import dataclass import pandas as pd from serenitas.utils.remote import SftpClient from serenitas.ops.trade_dataclasses import Fund from serenitas.utils.db import dawn_engine, dbconn from serenitas.analytics.dates import next_business_day from .misc import dt_from_citco @dataclass class CitcoReport: date: datetime.date fund: Fund report_name: ClassVar table: ClassVar knowledge_date: datetime.date = None fund: Fund = None _conn: dbconn = dbconn("dawndb") date_cols: ClassVar = [] dtkey: ClassVar _registry = {} def __init_subclass__(cls, table, date_cols, dtkey): cls.table = table cls.date_cols = date_cols cls.dtkey = dtkey cls._registry[table] = cls def __class_getitem__(cls, table): return cls._registry[table] def __post_init__(self): self._sftp = SftpClient.from_creds("citco", folder="outgoing") if not self.report_file_name: raise ValueError(f"No reports for {self.table} on {self.date}") self.knowledge_date = datetime.datetime.combine( next_business_day(self.date), self.dtkey_fun(self.report_file_name).time() ) @property def dtkey_fun(self): return partial(dt_from_citco, file_tag=self.file_prefix, dt_format=self.dtkey) @property def sftp_report_date(self): return self.date @property def report_file_name(self): return max( [ f for f in self._sftp.client.listdir() if (self.file_prefix in f) and (self.dtkey_fun(f).date() == self.sftp_report_date) ], key=self.dtkey_fun, default=None, ) def to_df(self): with self._sftp.client.open(self.report_file_name) as fh: df = pd.read_csv(fh, parse_dates=self.date_cols, infer_datetime_format=True) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = self.date df["knowledge_date"] = self.knowledge_date df["fund"] = self.fund return df def to_db(self): df = self.to_df() with self._conn.cursor() as c: c.execute( f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s", ( self.date, self.fund, ), ) self._conn.commit() if "strategy" in df.columns: df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True) df.to_sql(self.table, dawn_engine, if_exists="append", index=False) class AccruedReport( CitcoReport, table="isosel_accrued", date_cols=[ "Init Date", "Init Settle Date", "Liqd Date", "Liqd Settle Date", "Bond Maturity", "Orig Date", "Start Date", "End Date", ], dtkey="%Y%m%d%H%M%S", ): @property def sftp_report_date(self): return next_business_day(self.date) @property def file_prefix(self): return f"100502500_INNOCAP_{self.fund}_" class AllReport( CitcoReport, table="citco_reports", date_cols=["Maturity Date"], dtkey="%Y%m%d.%H%M%S", ): @property def file_prefix(self): return ( f"SPOS4X_INNOCAP_{self.fund}_D_{'IM' if self.fund =='ISOSEL' else 'IMCG'}." )