import datetime from typing import ClassVar from functools import partial from dataclasses import dataclass import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.utils.db2 import dbconn from serenitas.analytics.dates import prev_business_day from .misc import get_dir, dt_from_fname, Custodian @dataclass class CashReport: custodian: ClassVar[Custodian] date: datetime.date dtkey: ClassVar _conn: ClassVar = dbconn("dawndb") _staging_queue: ClassVar[set] = set() _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" _registry = {} def __init_subclass__(cls, custodian, dtkey): cls.custodian = custodian cls.dtkey = dtkey cls._registry[custodian] = cls def __class_getitem__(cls, key): return cls._registry[key] @classmethod def get_report(cls, date, fund, prefix=None): report_dir = get_dir(date) report_dir.mkdir(exist_ok=True, parents=True) prefix = prefix if prefix else f"{cls.custodian}_CASH_{fund}" p = max( [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)], key=partial(dt_from_fname, dt_format=cls.dtkey), default=None, ) if not p: raise ValueError( f"No reports found for fund: {prefix.split('_')[-1]} date: {date}" ) return p @classmethod def commit(cls): with cls._conn.cursor() as c: c.executemany(cls._insert_sql, cls._staging_queue) cls._conn.commit() @classmethod def clear(cls): cls._staging_queue.clear() @classmethod def stage_from_row(cls, row, date, fund): (account, currency), amount = row cls._staging_queue.add( ( prev_business_day(date), fund, f"{cls.custodian} Custody Account {fund}", account, currency, amount, ) ) @classmethod def to_db(cls, date, fund): for row in cls.yield_rows(date, fund): cls.stage_from_row(row, date, fund) cls.commit() cls.clear() class NTCashReport(CashReport, custodian="NT", dtkey="%Y%m%d%H%M"): @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) df = pd.read_csv(p, on_bad_lines="warn") df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() df = df.set_index(["consolidation", "currencycode"]) yield from df.itertuples() class UMBCashReport(CashReport, custodian="UMB", dtkey="%Y%m%d%H%M"): @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) df = pd.read_excel(p, skiprows=3) yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[ "Current Balance" ].items() class BNYCashReport(CashReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) df = pd.read_csv(p) df["Beginning Balance Local"] = df["Beginning Balance Local"].apply( lambda s: "-" + s[1:-1] if s.startswith("(") else s ) df["Beginning Balance Local"] = pd.to_numeric( df["Beginning Balance Local"].str.replace(",", "") ) yield from df.groupby(["Account Number", "Local Currency Code"]).sum( numeric_only=True )["Beginning Balance Local"].items() class ScotiaCashReport(CashReport, custodian="SCOTIA", dtkey="%Y%m%d%H%M%S"): @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) df = pd.read_excel(p, skipfooter=1) if df.empty: # No wires, so we can't skip the footer df = pd.read_excel(p) yield ( (int(df.loc[0]["Account"]), df.loc[0]["Curr."]), df.loc[0]["Closing Bal."], ) @classmethod def get_report(cls, date, fund): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" try: return next( REPORT_DIR.glob( f"IsoSelene_{prev_business_day(date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) except StopIteration as e: raise ValueError(f"No file available for Scotia on {date}") from e