diff options
Diffstat (limited to 'python/report_ops/wires.py')
| -rw-r--r-- | python/report_ops/wires.py | 75 |
1 files changed, 42 insertions, 33 deletions
diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py index 297d5e58..3709b0d5 100644 --- a/python/report_ops/wires.py +++ b/python/report_ops/wires.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field import pandas as pd from serenitas.ops.trade_dataclasses import Deal, Ccy +from serenitas.ops.dataclass_mapping import Fund from serenitas.analytics.dates import prev_business_day from serenitas.utils.env import DAILY_DIR @@ -19,6 +20,7 @@ CUSTODIAN = Literal["UMB", "NT", "BNY"] @dataclass class WireReport(Deal, table_name="custodian_wires", deal_type="custodian_wires"): date: datetime.date + fund: Fund custodian: ClassVar[str] entry_date: datetime.date value_date: datetime.date @@ -47,23 +49,34 @@ class WireReport(Deal, table_name="custodian_wires", deal_type="custodian_wires" self.amount = float(self.amount) @classmethod - def get_newest_report(cls, fname, date): - cls.download_reports(date) + def get_report(cls, date, fund, prefix=None): + report_dir = get_dir(date) + report_dir.mkdir(exist_ok=True, parents=True) + prefix = prefix if prefix else f"{cls.custodian}_WIRE_{fund}" p = max( - [f for f in get_dir(date).iterdir() if fname in f.name], + [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)], key=partial(dt_from_fname, dt_format=cls.dtkey), default=None, ) if not p: - raise ValueError(f"No reports for {cls.fund} on {date}") + raise ValueError( + f"No reports found for fund: {prefix.split('_')[-1]} date: {date}" + ) return p + @classmethod + def to_db(cls, date, fund): + for line in cls.yield_rows(date, fund): + cls.from_report_line(line | {"fund": fund}).stage() + cls.commit() + class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): @classmethod def from_report_line(cls, line: dict): return cls( date=line["Report Run Date"], + fund=line["fund"], entry_date=line["Cash Entry Date"], value_date=line["Cash Value Date"], pay_date=line["Settle / Pay Date"], @@ -76,13 +89,11 @@ class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"): ) @classmethod - def to_db(cls, date): - p = cls.get_newest_report("BowdstWires", date) + def yield_rows(cls, date, fund): + p = cls.get_report(date, fund) with open(p) as fh: reader = DictReader(fh) - for line in reader: - cls.from_report_line(line).stage() - cls.commit() + yield from reader class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): @@ -90,6 +101,7 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): def from_report_line(cls, line: dict): return cls( date=line["Through date"], + fund=line["fund"], entry_date=line["D-GL-POST"], value_date=line["D-TRAN-EFF"], pay_date=line["D-TRAN-EFF"], @@ -100,14 +112,13 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"): ) @classmethod - def to_db(cls, date): - p = cls.get_newest_report("custodian_wires", date) + def yield_rows(cls, date, fund): + p = cls.get_report(date, fund) with open(p) as fh: reader = DictReader(fh) for line in reader: if "sponsor" in line["narrative"].lower(): - cls.from_report_line(line).stage() - cls.commit() + yield line class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"): @@ -115,6 +126,7 @@ class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"): def from_report_line(cls, line: dict): return cls( date=line["Transaction Date"], + fund=line["fund"], entry_date=line["Transaction Date"], value_date=line["Transaction Date"], pay_date=line["Transaction Date"], @@ -125,28 +137,27 @@ class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"): ) @classmethod - def to_db(cls, date): - p = cls.get_newest_report("umbwires_", date) + def yield_rows(cls, date, fund): + p = cls.get_report(date, fund) conn = cls._conn with conn.cursor() as c: c.execute( "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( date, - cls.fund, + fund, cls.custodian, ), ) conn.commit() df = pd.read_excel(p, skiprows=3) df["index"] = df.index - for row_dict in df.to_dict(orient="records"): - if row_dict["Transaction Date"].startswith( + for line in df.to_dict(orient="records"): + if line["Transaction Date"].startswith( "No records" ): # No wires at the moment - continue - cls.from_report_line(row_dict).stage() - cls.commit() + return + yield line class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None): @@ -154,6 +165,7 @@ class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None): def from_report_line(cls, line: dict): return cls( date=line["Value Date"], + fund=line["fund"], entry_date=line["Posting Date"], value_date=line["Value Date"], pay_date=line["Value Date"], @@ -164,28 +176,25 @@ class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None): ) @classmethod - def to_db(cls, date): - p = cls.get_newest_report(date) + def yield_rows(cls, date, fund): + p = cls.get_report(date, fund) conn = cls._conn with conn.cursor() as c: c.execute( - "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", + "SELECT 1 FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s", ( prev_business_day(date), - cls.fund, + fund, cls.custodian, ), ) - conn.commit() - df = pd.read_excel(p, skipfooter=2) - df["index"] = df.index - for row_dict in df.to_dict(orient="records"): - cls.from_report_line(row_dict).stage() - cls.commit() + if not (_ := c.fetchone()): + df = pd.read_excel(p, skipfooter=2) + df["index"] = df.index + yield from df.to_dict(orient="records") @classmethod - def get_newest_report(cls, date): - cls.download_reports(date) + def get_report(cls, date, fund): REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports" return next( REPORT_DIR.glob( |
