diff options
| -rw-r--r-- | python/report_ops/__main__.py | 13 | ||||
| -rw-r--r-- | python/report_ops/cash.py | 157 | ||||
| -rw-r--r-- | python/report_ops/wires.py | 38 |
3 files changed, 117 insertions, 91 deletions
diff --git a/python/report_ops/__main__.py b/python/report_ops/__main__.py index 94b670be..4372137a 100644 --- a/python/report_ops/__main__.py +++ b/python/report_ops/__main__.py @@ -87,9 +87,15 @@ if args.cash_reports or args.wire_reports: for custodian in custodians: get_custodian_download_fun(custodian)(args.date, fund, em=em) if args.cash_reports: - cash_report = CashReport[custodian](args.date, fund) + cash_report = CashReport[custodian] try: - cash_report.to_db() + for row in cash_report.yield_rows(args.date, fund): + cash = cash_report.from_report_line( + row | {"fund": fund, "knowledge_date": args.date} + ) + cash.stage() + cash_report.commit() + cash_report.clear() except (MissingDataError, RuntimeError) as e: logger.warning(e) if args.wire_reports: @@ -98,7 +104,8 @@ if args.cash_reports or args.wire_reports: for row in wire_report.yield_rows(args.date, fund): wire = wire_report.from_report_line(row | {"fund": fund}) wire.stage() - wire.commit() + wire_report.commit() + wire_report.clear() except (MissingDataError, RuntimeError) as e: logger.warning(e) diff --git a/python/report_ops/cash.py b/python/report_ops/cash.py index 10d2414c..f851d8d1 100644 --- a/python/report_ops/cash.py +++ b/python/report_ops/cash.py @@ -1,6 +1,6 @@ import datetime from typing import ClassVar -from dataclasses import dataclass +from dataclasses import dataclass, field import re import pandas as pd @@ -8,18 +8,22 @@ from serenitas.utils.env import DAILY_DIR from serenitas.utils.db2 import dbconn from serenitas.analytics.dates import prev_business_day from serenitas.analytics.exceptions import MissingDataError +from serenitas.ops.trade_dataclasses import Ccy +from serenitas.ops.dataclass_mapping import Fund from .misc import Custodian, get_dir +from .wires import Report @dataclass -class CashReport: - custodian: ClassVar[Custodian] - knowledge_date: datetime.date - fund: str - _conn: ClassVar[dbconn] = dbconn("dawndb") - _staging_queue: ClassVar[set] = set() - _registry = {} +class CashReport(Report, table_name="cash_balances"): + date: datetime.date + fund: Fund + account_name: str + account_number: str + currency_code: Ccy + balance: float + custodian: ClassVar[Custodian] = field(metadata={"insert": False}) def __init_subclass__(cls, custodian): cls.custodian = custodian @@ -28,82 +32,71 @@ class CashReport: def __class_getitem__(cls, key): return cls._registry[key] - def get_report(self): - report_dir = get_dir(self.knowledge_date) + @classmethod + def get_report(cls, knowledge_date, fund): + report_dir = get_dir(knowledge_date) + pattern = f"{cls.custodian}_CASH_{fund}_" reports = [ f for f in report_dir.iterdir() - if f.name.startswith(self.pattern) - and self.get_ts(f.name).date() == self.knowledge_date + if f.name.startswith(pattern) + and cls.get_ts(f.name).date() == knowledge_date ] p = max( reports, - key=lambda f: self.get_ts(f.name), + key=lambda f: cls.get_ts(f.name), default=None, ) if p: return p else: raise MissingDataError( - f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}" + f"Report not ready {knowledge_date}: {cls.custodian} {fund}" ) - @property - def pattern(self): - return f"{self.custodian}_CASH_{self.fund}_" - @staticmethod def get_ts(s): m = re.search(r"\d{12}", s) return datetime.datetime.strptime(m[0], "%Y%m%d%H%M") - @classmethod - def commit(cls): - sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" - with cls._conn.cursor() as c: - c.executemany(sql, cls._staging_queue) - cls._conn.commit() +class NTCashReport(CashReport, custodian="NT"): @classmethod - def clear(cls): - cls._staging_queue.clear() + def yield_rows(cls, knowledge_date, fund): + df = pd.read_csv(cls.get_report(knowledge_date, fund), on_bad_lines="warn") + df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] + yield from df.to_dict(orient="records") - def stage(self, row): - (account, currency), amount = row - self._staging_queue.add( - ( - prev_business_day(self.knowledge_date), - self.fund, - f"{self.custodian} Account {self.fund}", - account, - currency, - amount, - ) + @classmethod + def from_report_line(cls, d): + return cls( + date=prev_business_day(d["knowledge_date"]), + fund=d["fund"], + account_name=d["Consolidation"], + account_number=d["Account Number"], + currency_code=d["Currency code"], + balance=d["A-TRAN-AMT"], ) - def to_db(self): - for row in self.yield_rows(): - self.stage(row) - self.commit() - self.clear() - - -class NTCashReport(CashReport, custodian="NT"): - def yield_rows(self): - df = pd.read_csv(self.get_report(), on_bad_lines="warn") - df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] - df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] - df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() - df = df.set_index(["consolidation", "currencycode"]) - yield from df.itertuples() - class UMBCashReport(CashReport, custodian="UMB"): - def yield_rows(self): - df = pd.read_excel(self.get_report(), skiprows=3) - yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[ - "Current Balance" - ].items() + @classmethod + def yield_rows(cls, knowledge_date, fund): + df = pd.read_excel(cls.get_report(knowledge_date, fund), skiprows=3) + yield from df.groupby(["Portfolio #", "Currency", "Portfolio Name"]).sum( + numeric_only=True + )["Current Balance"].reset_index().to_dict(orient="records") + + @classmethod + def from_report_line(cls, d): + return cls( + date=prev_business_day(d["knowledge_date"]), + fund=d["fund"], + account_name=d["Portfolio Name"], + account_number=d["Portfolio #"], + currency_code=d["Currency"], + balance=d["Current Balance"], + ) class BNYCashReport(CashReport, custodian="BNY"): @@ -112,39 +105,61 @@ class BNYCashReport(CashReport, custodian="BNY"): m = re.search(r"\d{14}", s) return datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S") - def yield_rows(self): - df = pd.read_csv(self.get_report()) + @classmethod + def yield_rows(cls, knowledge_date, fund): + df = pd.read_csv(cls.get_report(knowledge_date, fund)) df["Beginning Balance Local"] = df["Beginning Balance Local"].apply( lambda s: "-" + s[1:-1] if s.startswith("(") else s ) df["Beginning Balance Local"] = pd.to_numeric( df["Beginning Balance Local"].str.replace(",", "") ) - yield from df.groupby(["Account Number", "Local Currency Code"]).sum( - numeric_only=True - )["Beginning Balance Local"].items() + yield from df.groupby( + ["Account Number", "Account Name", "Local Currency Code"] + ).sum(numeric_only=True).reset_index().to_dict(orient="records") + + @classmethod + def from_report_line(cls, d): + return cls( + date=prev_business_day(d["knowledge_date"]), + fund=d["fund"], + account_name=d["Account Name"], + account_number=d["Account Number"], + currency_code=d["Local Currency Code"], + balance=d["Beginning Balance Local"], + ) class ScotiaCashReport(CashReport, custodian="SCOTIA"): - def yield_rows(self): - p = self.get_report() + @classmethod + def yield_rows(cls, knowledge_date, fund): + p = cls.get_report(knowledge_date, fund) df = pd.read_excel(p, skipfooter=1) if df.empty: # No wires, so we can't skip the footer df = pd.read_excel(p) - yield ( - (int(df.loc[0]["Account"]), df.loc[0]["Curr."]), - df.loc[0]["Closing Bal."], + yield df.to_dict(orient="records")[0] + + @classmethod + def from_report_line(cls, d): + return cls( + date=prev_business_day(d["knowledge_date"]), + fund=d["fund"], + account_name=d["Account"], + account_number=d["Account Name"], + currency_code="USD", + balance=d["Opening Bal."], ) - def get_report(self): + @classmethod + def get_report(cls, knowledge_date, fund): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" try: return next( REPORT_DIR.glob( - f"IsoSelene_{prev_business_day(self.knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" + f"IsoSelene_{prev_business_day(knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) except StopIteration as e: raise MissingDataError( - f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}" + f"Report not ready {knowledge_date}: {cls.custodian} {fund}" ) diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py index b84331cd..baaae19d 100644 --- a/python/report_ops/wires.py +++ b/python/report_ops/wires.py @@ -25,7 +25,7 @@ class Report: def __init_subclass__(cls, table_name: str): cls._insert_fields = list(filter(cls.is_insert_field, cls.__annotations__)) place_holders = ",".join(["%s"] * len(cls._insert_fields)) - cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls._insert_fields)}) VALUES({place_holders}) ON CONFLICT (unique_ref) DO NOTHING RETURNING *" + cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls._insert_fields)}) VALUES({place_holders}) ON CONFLICT DO NOTHING RETURNING *" cls._insert_queue = [] @classmethod @@ -37,6 +37,23 @@ class Report: case _: return True + @classmethod + def clear(cls): + cls._insert_queue.clear() + + def stage(self): + self._insert_queue.append( + tuple([getattr(self, col) for col in self._insert_fields]) + ) + + @classmethod + def commit(cls): + conn = cls._conn + with conn.cursor() as c: + c.executemany(cls._sql_insert, cls._insert_queue) + conn.commit() + cls._insert_queue.clear() + @dataclass class WireReport(Report, table_name="custodian_wires"): @@ -52,7 +69,7 @@ class WireReport(Report, table_name="custodian_wires"): unique_ref: str dtkey: ClassVar = field(metadata={"insert": False}) - def __init_subclass__(cls, custodian, dtkey, **kwargs): + def __init_subclass__(cls, custodian, dtkey): cls.custodian = custodian cls._registry[custodian] = cls cls.dtkey = dtkey @@ -68,11 +85,6 @@ class WireReport(Report, table_name="custodian_wires"): else: self.amount = float(self.amount) - def stage(self): - self._insert_queue.append( - tuple([getattr(self, col) for col in self._insert_fields]) - ) - @classmethod def get_report(cls, date, fund, prefix=None): report_dir = get_dir(date) @@ -89,14 +101,6 @@ class WireReport(Report, table_name="custodian_wires"): ) return p - @classmethod - def commit(cls): - conn = cls._conn - with conn.cursor() as c: - c.executemany(cls._sql_insert, cls._insert_queue) - conn.commit() - cls._insert_queue.clear() - class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): @classmethod @@ -147,8 +151,8 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): if "sponsor" in line["narrative"].lower(): yield line - @classmethod - def nt_to_enum(cls, ccy): + @staticmethod + def nt_to_enum(ccy): _mapping = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} return _mapping[ccy] |
