diff options
| -rw-r--r-- | python/report_ops/__main__.py | 4 | ||||
| -rw-r--r-- | python/report_ops/cash.py | 117 | ||||
| -rw-r--r-- | python/report_ops/custodians.py | 2 |
3 files changed, 66 insertions, 57 deletions
diff --git a/python/report_ops/__main__.py b/python/report_ops/__main__.py index ff241e23..314b9d2f 100644 --- a/python/report_ops/__main__.py +++ b/python/report_ops/__main__.py @@ -87,9 +87,9 @@ if args.cash_reports or args.wire_reports: for custodian in custodians: get_custodian_download_fun(custodian)(args.date, fund, em=em) if args.cash_reports: - cash_report = CashReport[custodian] + cash_report = CashReport[custodian](args.date, fund) try: - cash_report.to_db(args.date, fund) + cash_report.to_db() except (MissingDataError, RuntimeError) as e: logger.warning(e) if args.wire_reports: diff --git a/python/report_ops/cash.py b/python/report_ops/cash.py index c71fcdc0..ba3f51c7 100644 --- a/python/report_ops/cash.py +++ b/python/report_ops/cash.py @@ -1,7 +1,7 @@ import datetime from typing import ClassVar -from functools import partial from dataclasses import dataclass +import re import pandas as pd from serenitas.utils.env import DAILY_DIR @@ -9,80 +9,89 @@ from serenitas.utils.db2 import dbconn from serenitas.analytics.dates import prev_business_day from serenitas.analytics.exceptions import MissingDataError -from .misc import get_dir, dt_from_fname, Custodian +from .misc import Custodian, get_dir @dataclass class CashReport: custodian: ClassVar[Custodian] - date: datetime.date - dtkey: ClassVar - _conn: ClassVar = dbconn("dawndb") + knowledge_date: datetime.date + fund: str + _conn: ClassVar[dbconn] = dbconn("dawndb") _staging_queue: ClassVar[set] = set() - _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" _registry = {} - def __init_subclass__(cls, custodian, dtkey): + def __init_subclass__(cls, custodian): cls.custodian = custodian - cls.dtkey = dtkey cls._registry[custodian] = cls def __class_getitem__(cls, key): return cls._registry[key] - @classmethod - def get_report(cls, date, fund, prefix=None): - report_dir = get_dir(date) + def get_report(self): + report_dir = get_dir(self.knowledge_date) report_dir.mkdir(exist_ok=True, parents=True) - prefix = prefix if prefix else f"{cls.custodian}_CASH_{fund}" + reports = [ + f + for f in report_dir.iterdir() + if f.name.startswith(self.pattern) + and self.get_ts(f.name).date() == self.knowledge_date + ] p = max( - [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)], - key=partial(dt_from_fname, dt_format=cls.dtkey), + reports, + key=lambda f: self.get_ts(f.name), default=None, ) - if not p: + if p: + return p + else: raise MissingDataError( - f"No reports found for fund: {prefix.split('_')[-1]} date: {date}" + f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}" ) - return p + + @property + def pattern(self): + return f"{self.custodian}_CASH_{self.fund}_" + + @staticmethod + def get_ts(s): + m = re.search(r"\d{12}", s) + return datetime.datetime.strptime(m[0], "%Y%m%d%H%M") @classmethod def commit(cls): + sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" with cls._conn.cursor() as c: - c.executemany(cls._insert_sql, cls._staging_queue) + c.executemany(sql, cls._staging_queue) cls._conn.commit() @classmethod def clear(cls): cls._staging_queue.clear() - @classmethod - def stage_from_row(cls, row, date, fund): + def stage(self, row): (account, currency), amount = row - cls._staging_queue.add( + self._staging_queue.add( ( - prev_business_day(date), - fund, - f"{cls.custodian} Custody Account {fund}", + prev_business_day(self.knowledge_date), + self.fund, + f"{self.custodian} Account {self.fund}", account, currency, amount, ) ) - @classmethod - def to_db(cls, date, fund): - for row in cls.yield_rows(date, fund): - cls.stage_from_row(row, date, fund) - cls.commit() - cls.clear() + def to_db(self): + for row in self.yield_rows(): + self.stage(row) + self.commit() + self.clear() -class NTCashReport(CashReport, custodian="NT", dtkey="%Y%m%d%H%M"): - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - df = pd.read_csv(p, on_bad_lines="warn") +class NTCashReport(CashReport, custodian="NT"): + def yield_rows(self): + df = pd.read_csv(self.get_report(), on_bad_lines="warn") df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() @@ -90,21 +99,22 @@ class NTCashReport(CashReport, custodian="NT", dtkey="%Y%m%d%H%M"): yield from df.itertuples() -class UMBCashReport(CashReport, custodian="UMB", dtkey="%Y%m%d%H%M"): - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - df = pd.read_excel(p, skiprows=3) +class UMBCashReport(CashReport, custodian="UMB"): + def yield_rows(self): + df = pd.read_excel(self.get_report(), skiprows=3) yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[ "Current Balance" ].items() -class BNYCashReport(CashReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) - df = pd.read_csv(p) +class BNYCashReport(CashReport, custodian="BNY"): + @staticmethod + def get_ts(s): + m = re.search(r"\d{14}", s) + return datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S") + + def yield_rows(self): + df = pd.read_csv(self.get_report()) df["Beginning Balance Local"] = df["Beginning Balance Local"].apply( lambda s: "-" + s[1:-1] if s.startswith("(") else s ) @@ -116,27 +126,24 @@ class BNYCashReport(CashReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): )["Beginning Balance Local"].items() -class ScotiaCashReport(CashReport, custodian="SCOTIA", dtkey="%Y%m%d%H%M%S"): - @classmethod - def yield_rows(cls, date, fund): - p = cls.get_report(date, fund) +class ScotiaCashReport(CashReport, custodian="SCOTIA"): + def yield_rows(self): + p = self.get_report() df = pd.read_excel(p, skipfooter=1) - if df.empty: - # No wires, so we can't skip the footer + if df.empty: # No wires, so we can't skip the footer df = pd.read_excel(p) yield ( (int(df.loc[0]["Account"]), df.loc[0]["Curr."]), df.loc[0]["Closing Bal."], ) - @classmethod - def get_report(cls, date, fund): + def get_report(self): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" try: return next( REPORT_DIR.glob( - f"IsoSelene_{prev_business_day(date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" + f"IsoSelene_{prev_business_day(self.knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1" ) ) except StopIteration as e: - raise MissingDataError(f"No file available for Scotia on {date}") from e + return diff --git a/python/report_ops/custodians.py b/python/report_ops/custodians.py index 93aba172..b345409a 100644 --- a/python/report_ops/custodians.py +++ b/python/report_ops/custodians.py @@ -131,6 +131,8 @@ def download_nt_reports(date, fund, em): fname = f"NT_WIRE_{fund}" elif "Cash" in verify_result.file_name: fname = f"NT_CASH_{fund}" + elif "CLS Daily Status": + fname = f"NT_CLS_{fund}" else: print(f"NT file not recognized:{verify_result.file_name}") p = dest / f"{fname}_{message_time:%Y%m%d%H%M}.csv" |
