aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/wires.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops/wires.py')
-rw-r--r--python/report_ops/wires.py75
1 files changed, 42 insertions, 33 deletions
diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py
index 297d5e58..3709b0d5 100644
--- a/python/report_ops/wires.py
+++ b/python/report_ops/wires.py
@@ -6,6 +6,7 @@ from dataclasses import dataclass, field
import pandas as pd
from serenitas.ops.trade_dataclasses import Deal, Ccy
+from serenitas.ops.dataclass_mapping import Fund
from serenitas.analytics.dates import prev_business_day
from serenitas.utils.env import DAILY_DIR
@@ -19,6 +20,7 @@ CUSTODIAN = Literal["UMB", "NT", "BNY"]
@dataclass
class WireReport(Deal, table_name="custodian_wires", deal_type="custodian_wires"):
date: datetime.date
+ fund: Fund
custodian: ClassVar[str]
entry_date: datetime.date
value_date: datetime.date
@@ -47,23 +49,34 @@ class WireReport(Deal, table_name="custodian_wires", deal_type="custodian_wires"
self.amount = float(self.amount)
@classmethod
- def get_newest_report(cls, fname, date):
- cls.download_reports(date)
+ def get_report(cls, date, fund, prefix=None):
+ report_dir = get_dir(date)
+ report_dir.mkdir(exist_ok=True, parents=True)
+ prefix = prefix if prefix else f"{cls.custodian}_WIRE_{fund}"
p = max(
- [f for f in get_dir(date).iterdir() if fname in f.name],
+ [f for f in get_dir(date).iterdir() if f.name.startswith(prefix)],
key=partial(dt_from_fname, dt_format=cls.dtkey),
default=None,
)
if not p:
- raise ValueError(f"No reports for {cls.fund} on {date}")
+ raise ValueError(
+ f"No reports found for fund: {prefix.split('_')[-1]} date: {date}"
+ )
return p
+ @classmethod
+ def to_db(cls, date, fund):
+ for line in cls.yield_rows(date, fund):
+ cls.from_report_line(line | {"fund": fund}).stage()
+ cls.commit()
+
class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"):
@classmethod
def from_report_line(cls, line: dict):
return cls(
date=line["Report Run Date"],
+ fund=line["fund"],
entry_date=line["Cash Entry Date"],
value_date=line["Cash Value Date"],
pay_date=line["Settle / Pay Date"],
@@ -76,13 +89,11 @@ class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"):
)
@classmethod
- def to_db(cls, date):
- p = cls.get_newest_report("BowdstWires", date)
+ def yield_rows(cls, date, fund):
+ p = cls.get_report(date, fund)
with open(p) as fh:
reader = DictReader(fh)
- for line in reader:
- cls.from_report_line(line).stage()
- cls.commit()
+ yield from reader
class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"):
@@ -90,6 +101,7 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"):
def from_report_line(cls, line: dict):
return cls(
date=line["Through date"],
+ fund=line["fund"],
entry_date=line["D-GL-POST"],
value_date=line["D-TRAN-EFF"],
pay_date=line["D-TRAN-EFF"],
@@ -100,14 +112,13 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"):
)
@classmethod
- def to_db(cls, date):
- p = cls.get_newest_report("custodian_wires", date)
+ def yield_rows(cls, date, fund):
+ p = cls.get_report(date, fund)
with open(p) as fh:
reader = DictReader(fh)
for line in reader:
if "sponsor" in line["narrative"].lower():
- cls.from_report_line(line).stage()
- cls.commit()
+ yield line
class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"):
@@ -115,6 +126,7 @@ class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"):
def from_report_line(cls, line: dict):
return cls(
date=line["Transaction Date"],
+ fund=line["fund"],
entry_date=line["Transaction Date"],
value_date=line["Transaction Date"],
pay_date=line["Transaction Date"],
@@ -125,28 +137,27 @@ class UMBWireReport(WireReport, custodian="UMB", dtkey="%Y%m%d%H%M"):
)
@classmethod
- def to_db(cls, date):
- p = cls.get_newest_report("umbwires_", date)
+ def yield_rows(cls, date, fund):
+ p = cls.get_report(date, fund)
conn = cls._conn
with conn.cursor() as c:
c.execute(
"DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s",
(
date,
- cls.fund,
+ fund,
cls.custodian,
),
)
conn.commit()
df = pd.read_excel(p, skiprows=3)
df["index"] = df.index
- for row_dict in df.to_dict(orient="records"):
- if row_dict["Transaction Date"].startswith(
+ for line in df.to_dict(orient="records"):
+ if line["Transaction Date"].startswith(
"No records"
): # No wires at the moment
- continue
- cls.from_report_line(row_dict).stage()
- cls.commit()
+ return
+ yield line
class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None):
@@ -154,6 +165,7 @@ class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None):
def from_report_line(cls, line: dict):
return cls(
date=line["Value Date"],
+ fund=line["fund"],
entry_date=line["Posting Date"],
value_date=line["Value Date"],
pay_date=line["Value Date"],
@@ -164,28 +176,25 @@ class SCOTIAWireReport(WireReport, custodian="SCOTIA", dtkey=None):
)
@classmethod
- def to_db(cls, date):
- p = cls.get_newest_report(date)
+ def yield_rows(cls, date, fund):
+ p = cls.get_report(date, fund)
conn = cls._conn
with conn.cursor() as c:
c.execute(
- "DELETE FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s",
+ "SELECT 1 FROM custodian_wires WHERE date=%s AND fund=%s AND custodian=%s",
(
prev_business_day(date),
- cls.fund,
+ fund,
cls.custodian,
),
)
- conn.commit()
- df = pd.read_excel(p, skipfooter=2)
- df["index"] = df.index
- for row_dict in df.to_dict(orient="records"):
- cls.from_report_line(row_dict).stage()
- cls.commit()
+ if not (_ := c.fetchone()):
+ df = pd.read_excel(p, skipfooter=2)
+ df["index"] = df.index
+ yield from df.to_dict(orient="records")
@classmethod
- def get_newest_report(cls, date):
- cls.download_reports(date)
+ def get_report(cls, date, fund):
REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports"
return next(
REPORT_DIR.glob(