import datetime from csv import DictReader from functools import partial from dataclasses import dataclass, field import pandas as pd from typing import Literal from serenitas.ops.trade_dataclasses import Deal, Ccy from serenitas.analytics.dates import prev_business_day from serenitas.utils.env import DAILY_DIR from typing import ClassVar from .custodians import NT, BNY, UMB, SCOTIA from .misc import get_dir, dt_from_fname _nt_to_currency = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} CUSTODIAN = Literal["UMB", "NT", "BNY"] @dataclass class Wire(Deal, table_name="custodian_wires", deal_type="custodian_wires"): date: datetime.date fund: ClassVar[CUSTODIAN] custodian: ClassVar[str] entry_date: datetime.date value_date: datetime.date pay_date: datetime.date currency: Ccy amount: float wire_details: str unique_ref: str dtkey: ClassVar = field(metadata={"insert": False, "select": False}) def __init_subclass__(cls, fund, custodian, dtkey, **kwargs): cls._sql_insert = ( cls._sql_insert.removesuffix("RETURNING *") + "ON CONFLICT (unique_ref) DO NOTHING RETURNING *" ) cls.fund = fund cls.custodian = custodian cls._registry[ ( fund, custodian, ) ] = cls cls.dtkey = dtkey def __post_init__(self): if isinstance(self.amount, str): self.amount = self.amount.replace(",", "") if "(" in self.amount: self.amount = -float(self.amount[1:-1]) else: self.amount = float(self.amount) @classmethod def get_newest_report(cls, fname, date): cls.download_reports(date) p = max( [f for f in get_dir(date).iterdir() if fname in f.name], key=partial(dt_from_fname, dt_format=cls.dtkey), default=None, ) if not p: raise ValueError(f"No reports for {cls.fund} on {date}") return p class BowdstBNYWire(Wire, BNY, fund="BOWDST", custodian="BNY", dtkey="%Y%m%d%H%M%S"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Report Run Date"], entry_date=line["Cash Entry Date"], value_date=line["Cash Value Date"], pay_date=line["Settle / Pay Date"], currency=line["Local Currency Code"], amount=line["Local Amount"], wire_details=line["Transaction Description 1"] if line["Transaction Type Code"] == "CW" else line["Transaction Description 2"], unique_ref=line["Reference Number"], ) @classmethod def to_db(cls, date): p = cls.get_newest_report("BowdstWires", date) with open(p) as fh: reader = DictReader(fh) for line in reader: cls.from_report_line(line).stage() cls.commit() class SeleneNTWire(Wire, NT, fund="ISOSEL", custodian="NT", dtkey="%Y%m%d%H%M"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Through date"], entry_date=line["D-GL-POST"], value_date=line["D-TRAN-EFF"], pay_date=line["D-TRAN-EFF"], currency=_nt_to_currency[line["N-GL-AC30"]], amount=line["Net amount - local"], wire_details=line["narrative"], unique_ref=line["C-EXTL-SYS-TRN-DSC-3"], ) @classmethod def to_db(cls, date): p = cls.get_newest_report("custodian_wires", date) with open(p) as fh: reader = DictReader(fh) for line in reader: if "sponsor" in line["narrative"].lower(): cls.from_report_line(line).stage() cls.commit() class SerenitasUMBWire( Wire, UMB, fund="SERCGMAST", custodian="UMB", dtkey="%Y%m%d%H%M" ): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Transaction Date"], entry_date=line["Transaction Date"], value_date=line["Transaction Date"], pay_date=line["Transaction Date"], currency=line["Local Currency Code"], amount=line["Net Amount"], wire_details=line["Transaction Description"], unique_ref=f'{line["Transaction Date"]}-{line["index"]}', ) @classmethod def to_db(cls, date): p = cls.get_newest_report("umbwires_", date) conn = cls._conn with conn.cursor() as c: c.execute( "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( date, cls.fund, cls.custodian, ), ) conn.commit() df = pd.read_excel(p, skiprows=3) df["index"] = df.index for row_dict in df.to_dict(orient="records"): if row_dict["Transaction Date"].startswith( "No records" ): # No wires at the moment continue cls.from_report_line(row_dict).stage() cls.commit() class SeleneSCOTIAWire(Wire, SCOTIA, fund="ISOSEL", custodian="SCOTIA", dtkey=None): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Value Date"], entry_date=line["Posting Date"], value_date=line["Value Date"], pay_date=line["Value Date"], currency=line["Curr."], amount=line["Cr Amount"] if line["Dr/Cr"] == "Cr" else -line["Dr Amount"], wire_details=line["Reference Data"], unique_ref=line["Bank Ref."], ) @classmethod def to_db(cls, date): p = cls.get_newest_report(date) conn = cls._conn with conn.cursor() as c: c.execute( "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( prev_business_day(date), cls.fund, cls.custodian, ), ) conn.commit() df = pd.read_excel(p, skipfooter=2) df["index"] = df.index for row_dict in df.to_dict(orient="records"): cls.from_report_line(row_dict).stage() cls.commit() @classmethod def get_newest_report(cls, date): cls.download_reports(date) REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" return next( REPORT_DIR.glob( f"IsoSelene_{prev_business_day(date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) )