from serenitas.analytics.dates import prev_business_day import datetime import pandas as pd from serenitas.utils.db import dbconn from typing import ClassVar from .misc import get_dir, dt_from_fname from .custodians import get_custodian_download_fun from functools import partial from dataclasses import dataclass from serenitas.utils.env import DAILY_DIR @dataclass class CashReport: custodian: ClassVar[str] date: datetime.date dtkey: ClassVar _conn: ClassVar[dbconn] = dbconn("dawndb") _staging_queue: ClassVar[set] = set() _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" _registry = {} def __init_subclass__(cls, custodian, dtkey): cls.custodian = custodian cls.dtkey = dtkey cls._registry[custodian] = cls def __class_getitem__(cls, key): return cls._registry[key] def get_cash_report(self, report_prefix): report_dir = get_dir(self.date) report_dir.mkdir(exist_ok=True, parents=True) p = max( [ f for f in get_dir(self.date).iterdir() if f.name.startswith(report_prefix) ], key=partial(dt_from_fname, dt_format=self.dtkey), default=None, ) if not p: raise ValueError( f"No reports found for fund: {self.fund} date: {self.date}" ) return p @classmethod def commit(cls): with cls._conn.cursor() as c: c.executemany(cls._insert_sql, cls._staging_queue) cls._conn.commit() @classmethod def clear(cls): cls._staging_queue.clear() def stage_from_row(self, row, fund): (account, currency), amount = row self._staging_queue.add( ( prev_business_day(self.date), self.fund, f"{self.custodian} Custody Account {self.fund}", account, currency, amount, ) ) @classmethod def to_db(self, fund): for row in self.yield_rows(): self.stage_from_row(row, fund) self.commit() self.clear() class NTCashReport(CashReport, custodian="NT", dtkey="%Y%m%d%H%M"): def yield_rows(self, fund): p = self.get_cash_report(f"NT_CASH_{fund}") df = pd.read_csv(p, on_bad_lines="warn") df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() df = df.set_index(["consolidation", "currencycode"]) yield from df.itertuples() class UMBCashReport(CashReport, custodian="UMB", dtkey="%Y%m%d%H%M"): def yield_rows(self, fund): p = self.get_cash_report("UMB_CASH_{fund}") df = pd.read_excel(p, skiprows=3) yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[ "Current Balance" ].items() class BNYCashReport(CashReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): def yield_rows(self): p = self.get_cash_report("BNY_CASH_{fund}") df = pd.read_csv(p) df["Beginning Balance Local"] = df["Beginning Balance Local"].apply( lambda s: "-" + s[1:-1] if s.startswith("(") else s ) df["Beginning Balance Local"] = pd.to_numeric( df["Beginning Balance Local"].str.replace(",", "") ) yield from df.groupby(["Account Number", "Local Currency Code"]).sum( numeric_only=True )["Beginning Balance Local"].items() class ScotiaCashReport(CashReport, custodian="SCOTIA", dtkey="%Y%m%d%H%M%S"): def yield_rows(self): p = self.get_cash_report() df = pd.read_excel(p, skipfooter=1) if df.empty: # No wires, so we can't skip the footer df = pd.read_excel(p) yield ( (int(df.loc[0]["Account"]), df.loc[0]["Curr."]), df.loc[0]["Closing Bal."], ) def get_cash_report(self, prefix=None): self.download_reports(self.date) REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" try: return next( REPORT_DIR.glob( f"IsoSelene_{prev_business_day(self.date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) except StopIteration as e: # File doesn't exist, let's get it" raise ValueError(f"No file available for Scotia on {self.date}") from e