import datetime from csv import DictReader from typing import ClassVar from functools import partial from dataclasses import dataclass, field import pandas as pd from serenitas.ops.trade_dataclasses import Deal, Ccy from serenitas.ops.dataclass_mapping import Fund from serenitas.analytics.dates import prev_business_day from serenitas.utils.env import DAILY_DIR from .misc import get_dir, dt_from_fname, Custodian @dataclass class WireReport(Deal, table_name="custodian_wires", deal_type="custodian_wires"): date: datetime.date fund: Fund custodian: ClassVar[Custodian] entry_date: datetime.date value_date: datetime.date pay_date: datetime.date currency: Ccy amount: float wire_details: str unique_ref: str dtkey: ClassVar = field(metadata={"insert": False, "select": False}) def __init_subclass__(cls, custodian, dtkey, **kwargs): cls._sql_insert = ( cls._sql_insert.removesuffix("RETURNING *") + "ON CONFLICT (unique_ref) DO NOTHING RETURNING *" ) cls.custodian = custodian cls._registry[custodian] = cls cls.dtkey = dtkey def __post_init__(self): if isinstance(self.amount, str): self.amount = self.amount.replace(",", "") if "(" in self.amount: self.amount = -float(self.amount[1:-1]) else: self.amount = float(self.amount) @classmethod def get_report(cls, date, fund, prefix=None): report_dir = get_dir(date) report_dir.mkdir(exist_ok=True, parents=True) prefix = prefix if prefix else f"{cls.custodian}_WIRE_{fund}" p = max( [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)], key=partial(dt_from_fname, dt_format=cls.dtkey), default=None, ) if not p: raise ValueError( f"No reports found for fund: {prefix.split('_')[-1]} date: {date}" ) return p @classmethod def to_db(cls, date, fund): for line in cls.yield_rows(date, fund): cls.from_report_line(line | {"fund": fund}).stage() cls.commit() class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Report Run Date"], entry_date=line["Cash Entry Date"], value_date=line["Cash Value Date"], pay_date=line["Settle / Pay Date"], currency=line["Local Currency Code"], amount=line["Local Amount"], wire_details=line["Transaction Description 1"] if line["Transaction Type Code"] == "CW" else line["Transaction Description 2"], unique_ref=line["Reference Number"], fund=line["fund"], ) @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) with open(p) as fh: reader = DictReader(fh) yield from reader class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Through date"], entry_date=line["D-GL-POST"], value_date=line["D-TRAN-EFF"], pay_date=line["D-TRAN-EFF"], currency=cls.nt_to_enum[line["N-GL-AC30"]], amount=line["Net amount - local"], wire_details=line["narrative"], unique_ref=line["C-EXTL-SYS-TRN-DSC-3"], fund=line["fund"], ) @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) with open(p) as fh: reader = DictReader(fh) for line in reader: if "sponsor" in line["narrative"].lower(): yield line @classmethod def nt_to_enum(cls, ccy): _mapping = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} return _mapping[ccy] class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Transaction Date"], entry_date=line["Transaction Date"], value_date=line["Transaction Date"], pay_date=line["Transaction Date"], currency=line["Local Currency Code"], amount=line["Net Amount"], wire_details=line["Transaction Description"], unique_ref=f'{line["Transaction Date"]}-{line["index"]}', fund=line["fund"], ) @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) conn = cls._conn # We only have one report for UMB with no unique identifier. Delete and reupload recent is the only way with conn.cursor() as c: c.execute( "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( date, fund, cls.custodian, ), ) conn.commit() df = pd.read_excel(p, skiprows=3) df["index"] = df.index for line in df.to_dict(orient="records"): if line["Transaction Date"].startswith( "No records" ): # No wires at the moment return yield line class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Value Date"], entry_date=line["Posting Date"], value_date=line["Value Date"], pay_date=line["Value Date"], currency=line["Curr."], amount=line["Cr Amount"] if line["Dr/Cr"] == "Cr" else -line["Dr Amount"], wire_details=line["Reference Data"], unique_ref=line["Bank Ref."], fund=line["fund"], ) @classmethod def yield_rows(cls, date, fund): p = cls.get_report(date, fund) conn = cls._conn with conn.cursor() as c: c.execute( "SELECT 1 FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( prev_business_day(date), fund, cls.custodian, ), ) if not (_ := c.fetchone()): df = pd.read_excel(p, skipfooter=2) df["index"] = df.index yield from df.to_dict(orient="records") @classmethod def get_report(cls, date, fund): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" return next( REPORT_DIR.glob( f"IsoSelene_{prev_business_day(date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) )