from typing import ClassVar import datetime from dataclasses import dataclass, field import re import pandas as pd from serenitas.utils.remote import SftpClient from serenitas.utils.db import dawn_engine from sqlalchemy.engine import Engine from serenitas.analytics.dates import next_business_day, prev_business_day from serenitas.analytics.exceptions import MissingDataError _suffix = {"ISOSEL": "IM", "CRSE": "IMCG"} @dataclass class CitcoReport: _engine: ClassVar[Engine] = dawn_engine _sftp: ClassVar[SftpClient] = SftpClient.from_creds("citco", folder="outgoing") table: ClassVar knowledge_date: datetime.date fund: str file_pattern: str = field(init=False) date_cols: ClassVar = [] pattern: ClassVar[str] _registry: ClassVar = {} def __init_subclass__(cls, table, date_cols, pattern): cls.table = table cls.date_cols = date_cols cls.pattern = pattern cls._registry[table] = cls def __post_init__(self): self.file_pattern = self.pattern.format( fund=self.fund, suffix=_suffix[self.fund] ) def __class_getitem__(cls, table): return cls._registry[table] def get_report(self): reports = [ f for f in self._sftp.client.listdir() if f.startswith(self.file_pattern) and self.get_ts(f).date() == self.knowledge_date ] return max( reports, key=self.get_ts, default=None, ) @classmethod def get_df(cls, report_name): with cls._sftp.client.open(report_name) as fh: ts = cls.get_ts(report_name) df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True) df["row"] = df.index df.columns = df.columns.str.lower() df.columns = df.columns.str.replace(" ", "_") df["period_end_date"] = ts.date() df["knowledge_date"] = next_business_day(ts) if "strategy" in df.columns: df["strategy"] = df["strategy"].str.replace( "/M_|/SER_", "/", regex=True ) return df def to_db(self): if report_name := self.get_report(): df = self.get_df(report_name) df["fund"] = self.fund with self._engine.connect() as conn: conn.execute( f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s", ( self.knowledge_date, self.fund, ), ) df.to_sql(self.table, self._engine, if_exists="append", index=False) else: raise MissingDataError( f"{self.fund}: {self.table} report not ready for {self.knowledge_date}" ) class AccruedReport( CitcoReport, table="isosel_accrued", date_cols=[ "Init Date", "Init Settle Date", "Liqd Date", "Liqd Settle Date", "Bond Maturity", "Orig Date", "Start Date", "End Date", ], pattern="100502500_INNOCAP_{fund}_", ): @staticmethod def get_ts(s): m = re.search(r"\d{12}", s) return prev_business_day(datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S")) class AllReport( CitcoReport, table="citco_reports", date_cols=["Maturity Date"], pattern="SPOS4X_INNOCAP_{fund}_D_{suffix}", ): @staticmethod def get_ts(s): m = re.search(r"\d{8}\.\d{6}", s) return datetime.datetime.strptime(m[0], "%Y%m%d.%H%M%S")