diff options
Diffstat (limited to 'python/report_ops/wires.py')
| -rw-r--r-- | python/report_ops/wires.py | 287 |
1 files changed, 141 insertions, 146 deletions
diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py index 1ecfb99a..7d82be6c 100644 --- a/python/report_ops/wires.py +++ b/python/report_ops/wires.py @@ -1,200 +1,195 @@ import datetime -from csv import DictReader +import re from typing import ClassVar -from functools import partial from dataclasses import dataclass import pandas as pd -from serenitas.ops.trade_dataclasses import Ccy from serenitas.ops.dataclass_mapping import Fund from serenitas.analytics.dates import prev_business_day from serenitas.analytics.exceptions import MissingDataError from serenitas.utils.env import DAILY_DIR +from serenitas.utils.db2 import dbconn -from .misc import get_dir, dt_from_fname, Custodian +from .misc import get_dir, Custodian from .base import Report @dataclass -class WireReport(Report, table_name="custodian_wires"): +class WireReport( + Report, + table_name="custodian_wires", + columns=( + "date", + "fund", + "custodian", + "entry_date", + "pay_date", + "value_date", + "currency", + "amount", + "wire_details", + "unique_ref", + ), +): date: datetime.date fund: Fund custodian: ClassVar[Custodian] - entry_date: datetime.date - value_date: datetime.date - pay_date: datetime.date - currency: Ccy - amount: float - wire_details: str - unique_ref: str - dtkey: ClassVar - def __init_subclass__(cls, custodian, dtkey): + def __init_subclass__(cls, custodian, **kwargs): cls.custodian = custodian cls._registry[custodian] = cls - cls.dtkey = dtkey - def __class_getitem__(cls, custodian): - return cls._registry[custodian] - - def __post_init__(self): - if isinstance(self.amount, str): - self.amount = self.amount.replace(",", "") - if "(" in self.amount: - self.amount = -float(self.amount[1:-1]) - else: - self.amount = float(self.amount) - - @classmethod - def get_report(cls, date, fund, prefix=None): - report_dir = get_dir(date) - report_dir.mkdir(exist_ok=True, parents=True) - prefix = prefix if prefix else f"{cls.custodian}_WIRE_{fund}" + def get_report(self): + report_dir = get_dir(self.date) + pattern = f"{self.custodian}_WIRE_{self.fund}_" + reports = [ + f + for f in report_dir.iterdir() + if f.name.startswith(pattern) and self.get_ts(f.name).date() == self.date + ] p = max( - [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)], - key=partial(dt_from_fname, dt_format=cls.dtkey), + reports, + key=lambda f: self.get_ts(f.name), default=None, ) - if not p: + if p: + return p + else: raise MissingDataError( - f"No reports found for fund: {prefix.split('_')[-1]} date: {date}" + f"Report not ready {self.date}: {self.custodian} {self.fund}" ) - return p + + @staticmethod + def get_ts(s): + m = re.search(r"\d{12}", s) + return datetime.datetime.strptime(m[0], "%Y%m%d%H%M") class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): - @classmethod - def from_report_line(cls, line: dict): - return cls( - date=line["Report Run Date"], - entry_date=line["Cash Entry Date"], - value_date=line["Cash Value Date"], - pay_date=line["Settle / Pay Date"], - currency=line["Local Currency Code"], - amount=line["Local Amount"], - wire_details=line["Transaction Description 1"] - if line["Transaction Type Code"] == "CW" - else line["Transaction Description 2"], - unique_ref=line["Reference Number"], - fund=line["fund"], + def __iter__(self): + df = pd.read_csv(self.get_report(), thousands=",") + df["Local Amount"] = df["Local Amount"].apply( + lambda s: "-" + s[1:-1] if s.startswith("(") else s + ) + df["Local Amount"] = pd.to_numeric( + df["Local Amount"].str.replace(",", "") + ) # Not sure how to avoid this with the thousands + df["Wire Details"] = df.apply( + lambda x: x["Transaction Description 1"] + if x["Transaction Type Code"] == "CW" + else x["Transaction Description 2"], + axis=1, + ) + return ( + (self.date, self.fund, self.custodian, *t) + for t in df[ + [ + "Cash Entry Date", + "Settle / Pay Date", + "Cash Value Date", + "Local Currency Code", + "Local Amount", + "Wire Details", + "Reference Number", + ] + ].itertuples(index=False) ) - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - with open(p) as fh: - reader = DictReader(fh) - yield from reader + @staticmethod + def get_ts(s): + m = re.search(r"\d{12}", s) + return datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S") -class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): - @classmethod - def from_report_line(cls, line: dict): - return cls( - date=line["Through date"], - entry_date=line["D-GL-POST"], - value_date=line["D-TRAN-EFF"], - pay_date=line["D-TRAN-EFF"], - currency=cls.nt_to_enum(line["N-GL-AC30"]), - amount=line["Net amount - local"], - wire_details=line["narrative"], - unique_ref=line["C-EXTL-SYS-TRN-DSC-3"], - fund=line["fund"], +class NTWireReport(WireReport, custodian="NT"): + def __iter__(self): + df = pd.read_csv(self.get_report()) + _ccy_mapping = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} + df["Currency"] = df["N-GL-AC30"].apply(lambda x: _ccy_mapping[x]) + return ( + (self.date, self.fund, self.custodian, *t) + for t in df[ + [ + "D-GL-POST", + "D-TRAN-EFF", + "D-TRAN-EFF", + "Currency", + "Net amount - local", + "narrative", + "C-EXTL-SYS-TRN-DSC-3", + ] + ].itertuples(index=False) + if "sponsor" in t.narrative.lower() ) - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - with open(p) as fh: - reader = DictReader(fh) - for line in reader: - if "sponsor" in line["narrative"].lower(): - yield line - - @staticmethod - def nt_to_enum(ccy): - _mapping = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} - return _mapping[ccy] - -class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"): - @classmethod - def from_report_line(cls, line: dict): - return cls( - date=line["Transaction Date"], - entry_date=line["Transaction Date"], - value_date=line["Transaction Date"], - pay_date=line["Transaction Date"], - currency=line["Local Currency Code"], - amount=line["Net Amount"], - wire_details=line["Transaction Description"], - unique_ref=f'{line["Transaction Date"]}-{line["index"]}', - fund=line["fund"], +class UMBWireReport(WireReport, custodian="UMB"): + def __iter__(self): + df = pd.read_excel(self.get_report(), skiprows=3) + df["index"] = df.index + df["Unique Ref"] = df.apply( + lambda x: f'{x["Transaction Date"]}-{x["index"]}', axis=1 + ) + self.clear_sql_entries() + return ( + ( + self.date, + self.fund, + self.custodian, + *t, + ) + for t in df[ + [ + "Transaction Date", + "Transaction Date", + "Transaction Date", + "Local Currency Code", + "Net Amount", + "Transaction Description", + "Unique Ref", + ] + ].itertuples(index=False) ) - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - conn = cls._conn - # We only have one report for UMB with no unique identifier. Delete and reupload recent is the only way + def clear_sql_entries(self): + conn = dbconn("dawndb") with conn.cursor() as c: c.execute( "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( - date, - fund, - cls.custodian, + self.date, + self.fund, + self.custodian, ), ) conn.commit() - df = pd.read_excel(p, skiprows=3) - df["index"] = df.index - for line in df.to_dict(orient="records"): - if line["Transaction Date"].startswith( - "No records" - ): # No wires at the moment - return - yield line -class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None): - @classmethod - def from_report_line(cls, line: dict): - return cls( - date=line["Value Date"], - entry_date=line["Posting Date"], - value_date=line["Value Date"], - pay_date=line["Value Date"], - currency=line["Curr."], - amount=line["Cr Amount"] if line["Dr/Cr"] == "Cr" else -line["Dr Amount"], - wire_details=line["Reference Data"], - unique_ref=line["Bank Ref."], - fund=line["fund"], +class SCOTIAWireReport(WireReport, custodian="SCOTIA"): + def __iter__(self): + df = pd.read_excel(self.get_report(), skipfooter=2) + df["Amount"] = df.apply( + lambda x: x["Cr Amount"] if x["Dr/Cr"] == "Cr" else -x["Dr Amount"], axis=1 + ) + return ( + (self.date, self.fund, self.custodian, *t) + for t in df[ + [ + "Posting Date", + "Value Date", + "Value Date", + "Curr.", + "Amount", + "Reference Data", + "Bank Ref.", + ] + ].itertuples(index=False) ) - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - conn = cls._conn - with conn.cursor() as c: - c.execute( - "SELECT 1 FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", - ( - prev_business_day(date), - fund, - cls.custodian, - ), - ) - if not (_ := c.fetchone()): - df = pd.read_excel(p, skipfooter=2) - df["index"] = df.index - yield from df.to_dict(orient="records") - - @classmethod - def get_report(cls, date, fund): + def get_report(self): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" return next( REPORT_DIR.glob( - f"IsoSelene_{prev_business_day(date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" + f"IsoSelene_{prev_business_day(self.date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) |
