from serenitas.analytics.dates import prev_business_day import datetime import pandas as pd from serenitas.utils.db import dbconn from typing import ClassVar from .misc import get_dir, dt_from_fname from .custodians import NT, UMB, BNY, SCOTIA from functools import partial from dataclasses import dataclass from serenitas.utils.env import DAILY_DIR @dataclass class CashReport: fund: ClassVar[str] custodian: ClassVar[str] date: datetime.date dtkey: ClassVar _conn: ClassVar[dbconn] = dbconn("dawndb") _staging_queue: ClassVar[set] = set() _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" _registry = {} def __init_subclass__(cls, fund, custodian, dtkey): cls.fund = fund cls.custodian = custodian cls.dtkey = dtkey cls._registry[ ( fund, custodian, ) ] = cls def __class_getitem__(cls, key): return cls._registry[key] def get_cash_report(self, report_prefix): self.download_reports(self.date) report_dir = get_dir(self.date) report_dir.mkdir(exist_ok=True, parents=True) p = max( [ f for f in get_dir(self.date).iterdir() if f.name.startswith(report_prefix) ], key=partial(dt_from_fname, dt_format=self.dtkey), default=None, ) if not p: raise ValueError( f"No reports found for fund: {self.fund} date: {self.date}" ) return p @classmethod def commit(cls): with cls._conn.cursor() as c: c.executemany(cls._insert_sql, cls._staging_queue) cls._conn.commit() def stage_from_row(self, row): (account, currency), amount = row self._staging_queue.add( ( prev_business_day(self.date), self.fund, f"{self.custodian} Custody Account {self.fund}", account, currency, amount, ) ) class SeleneNTCashReport( CashReport, NT, fund="ISOSEL", custodian="NT", dtkey="%Y%m%d%H%M" ): def to_db(self): p = self.get_cash_report("cash_") df = pd.read_csv(p, on_bad_lines="warn") df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() df = df.set_index(["consolidation", "currencycode"]) for row in df.itertuples(): self.stage_from_row(row) self.commit() self._staging_queue.clear() class SerenitasUMBCashReport( CashReport, UMB, fund="SERCGMAST", custodian="UMB", dtkey="%Y%m%d%H%M" ): def to_db(self): p = self.get_cash_report("umb_") df = pd.read_excel(p, skiprows=3) for row in ( df.groupby(["Portfolio #", "Currency"]) .sum(numeric_only=True)["Current Balance"] .items() ): self.stage_from_row(row) self.commit() self._staging_queue.clear() class BowdstBNYCashReport( CashReport, BNY, fund="BOWDST", custodian="BNY", dtkey="%Y%m%d%H%M%S" ): def to_db(self): p = self.get_cash_report("Live-cash") df = pd.read_csv(p) df["Beginning Balance Local"] = df["Beginning Balance Local"].apply( lambda s: "-" + s[1:-1] if s.startswith("(") else s ) df["Beginning Balance Local"] = pd.to_numeric( df["Beginning Balance Local"].str.replace(",", "") ) for row in ( df.groupby(["Account Number", "Local Currency Code"]) .sum(numeric_only=True)["Beginning Balance Local"] .items() ): self.stage_from_row(row) self.commit() self._staging_queue.clear() class SeleneScotiaCashReport( CashReport, SCOTIA, fund="ISOSEL", custodian="SCOTIA", dtkey="%Y%m%d%H%M%S" ): def to_db(self): p = self.get_cash_report() df = pd.read_excel(p, skipfooter=1) if df.empty: # No wires, so we can't skip the footer df = pd.read_excel(p) self.stage_from_row( ((int(df.loc[0]["Account"]), df.loc[0]["Curr."]), df.loc[0]["Closing Bal."]) ) self.commit() self._staging_queue.clear() def get_cash_report(self, prefix=None): self.download_reports(self.date) REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" try: return next( REPORT_DIR.glob( f"IsoSelene_{prev_business_day(self.date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) except StopIteration as e: # File doesn't exist, let's get it" raise ValueError(f"No file available for Scotia on {self.date}") from e