aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops')
-rw-r--r--python/report_ops/__main__.py13
-rw-r--r--python/report_ops/cash.py157
-rw-r--r--python/report_ops/wires.py38
3 files changed, 117 insertions, 91 deletions
diff --git a/python/report_ops/__main__.py b/python/report_ops/__main__.py
index 94b670be..4372137a 100644
--- a/python/report_ops/__main__.py
+++ b/python/report_ops/__main__.py
@@ -87,9 +87,15 @@ if args.cash_reports or args.wire_reports:
for custodian in custodians:
get_custodian_download_fun(custodian)(args.date, fund, em=em)
if args.cash_reports:
- cash_report = CashReport[custodian](args.date, fund)
+ cash_report = CashReport[custodian]
try:
- cash_report.to_db()
+ for row in cash_report.yield_rows(args.date, fund):
+ cash = cash_report.from_report_line(
+ row | {"fund": fund, "knowledge_date": args.date}
+ )
+ cash.stage()
+ cash_report.commit()
+ cash_report.clear()
except (MissingDataError, RuntimeError) as e:
logger.warning(e)
if args.wire_reports:
@@ -98,7 +104,8 @@ if args.cash_reports or args.wire_reports:
for row in wire_report.yield_rows(args.date, fund):
wire = wire_report.from_report_line(row | {"fund": fund})
wire.stage()
- wire.commit()
+ wire_report.commit()
+ wire_report.clear()
except (MissingDataError, RuntimeError) as e:
logger.warning(e)
diff --git a/python/report_ops/cash.py b/python/report_ops/cash.py
index 10d2414c..f851d8d1 100644
--- a/python/report_ops/cash.py
+++ b/python/report_ops/cash.py
@@ -1,6 +1,6 @@
import datetime
from typing import ClassVar
-from dataclasses import dataclass
+from dataclasses import dataclass, field
import re
import pandas as pd
@@ -8,18 +8,22 @@ from serenitas.utils.env import DAILY_DIR
from serenitas.utils.db2 import dbconn
from serenitas.analytics.dates import prev_business_day
from serenitas.analytics.exceptions import MissingDataError
+from serenitas.ops.trade_dataclasses import Ccy
+from serenitas.ops.dataclass_mapping import Fund
from .misc import Custodian, get_dir
+from .wires import Report
@dataclass
-class CashReport:
- custodian: ClassVar[Custodian]
- knowledge_date: datetime.date
- fund: str
- _conn: ClassVar[dbconn] = dbconn("dawndb")
- _staging_queue: ClassVar[set] = set()
- _registry = {}
+class CashReport(Report, table_name="cash_balances"):
+ date: datetime.date
+ fund: Fund
+ account_name: str
+ account_number: str
+ currency_code: Ccy
+ balance: float
+ custodian: ClassVar[Custodian] = field(metadata={"insert": False})
def __init_subclass__(cls, custodian):
cls.custodian = custodian
@@ -28,82 +32,71 @@ class CashReport:
def __class_getitem__(cls, key):
return cls._registry[key]
- def get_report(self):
- report_dir = get_dir(self.knowledge_date)
+ @classmethod
+ def get_report(cls, knowledge_date, fund):
+ report_dir = get_dir(knowledge_date)
+ pattern = f"{cls.custodian}_CASH_{fund}_"
reports = [
f
for f in report_dir.iterdir()
- if f.name.startswith(self.pattern)
- and self.get_ts(f.name).date() == self.knowledge_date
+ if f.name.startswith(pattern)
+ and cls.get_ts(f.name).date() == knowledge_date
]
p = max(
reports,
- key=lambda f: self.get_ts(f.name),
+ key=lambda f: cls.get_ts(f.name),
default=None,
)
if p:
return p
else:
raise MissingDataError(
- f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}"
+ f"Report not ready {knowledge_date}: {cls.custodian} {fund}"
)
- @property
- def pattern(self):
- return f"{self.custodian}_CASH_{self.fund}_"
-
@staticmethod
def get_ts(s):
m = re.search(r"\d{12}", s)
return datetime.datetime.strptime(m[0], "%Y%m%d%H%M")
- @classmethod
- def commit(cls):
- sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
- with cls._conn.cursor() as c:
- c.executemany(sql, cls._staging_queue)
- cls._conn.commit()
+class NTCashReport(CashReport, custodian="NT"):
@classmethod
- def clear(cls):
- cls._staging_queue.clear()
+ def yield_rows(cls, knowledge_date, fund):
+ df = pd.read_csv(cls.get_report(knowledge_date, fund), on_bad_lines="warn")
+ df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"]
+ yield from df.to_dict(orient="records")
- def stage(self, row):
- (account, currency), amount = row
- self._staging_queue.add(
- (
- prev_business_day(self.knowledge_date),
- self.fund,
- f"{self.custodian} Account {self.fund}",
- account,
- currency,
- amount,
- )
+ @classmethod
+ def from_report_line(cls, d):
+ return cls(
+ date=prev_business_day(d["knowledge_date"]),
+ fund=d["fund"],
+ account_name=d["Consolidation"],
+ account_number=d["Account Number"],
+ currency_code=d["Currency code"],
+ balance=d["A-TRAN-AMT"],
)
- def to_db(self):
- for row in self.yield_rows():
- self.stage(row)
- self.commit()
- self.clear()
-
-
-class NTCashReport(CashReport, custodian="NT"):
- def yield_rows(self):
- df = pd.read_csv(self.get_report(), on_bad_lines="warn")
- df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"]
- df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]]
- df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower()
- df = df.set_index(["consolidation", "currencycode"])
- yield from df.itertuples()
-
class UMBCashReport(CashReport, custodian="UMB"):
- def yield_rows(self):
- df = pd.read_excel(self.get_report(), skiprows=3)
- yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[
- "Current Balance"
- ].items()
+ @classmethod
+ def yield_rows(cls, knowledge_date, fund):
+ df = pd.read_excel(cls.get_report(knowledge_date, fund), skiprows=3)
+ yield from df.groupby(["Portfolio #", "Currency", "Portfolio Name"]).sum(
+ numeric_only=True
+ )["Current Balance"].reset_index().to_dict(orient="records")
+
+ @classmethod
+ def from_report_line(cls, d):
+ return cls(
+ date=prev_business_day(d["knowledge_date"]),
+ fund=d["fund"],
+ account_name=d["Portfolio Name"],
+ account_number=d["Portfolio #"],
+ currency_code=d["Currency"],
+ balance=d["Current Balance"],
+ )
class BNYCashReport(CashReport, custodian="BNY"):
@@ -112,39 +105,61 @@ class BNYCashReport(CashReport, custodian="BNY"):
m = re.search(r"\d{14}", s)
return datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S")
- def yield_rows(self):
- df = pd.read_csv(self.get_report())
+ @classmethod
+ def yield_rows(cls, knowledge_date, fund):
+ df = pd.read_csv(cls.get_report(knowledge_date, fund))
df["Beginning Balance Local"] = df["Beginning Balance Local"].apply(
lambda s: "-" + s[1:-1] if s.startswith("(") else s
)
df["Beginning Balance Local"] = pd.to_numeric(
df["Beginning Balance Local"].str.replace(",", "")
)
- yield from df.groupby(["Account Number", "Local Currency Code"]).sum(
- numeric_only=True
- )["Beginning Balance Local"].items()
+ yield from df.groupby(
+ ["Account Number", "Account Name", "Local Currency Code"]
+ ).sum(numeric_only=True).reset_index().to_dict(orient="records")
+
+ @classmethod
+ def from_report_line(cls, d):
+ return cls(
+ date=prev_business_day(d["knowledge_date"]),
+ fund=d["fund"],
+ account_name=d["Account Name"],
+ account_number=d["Account Number"],
+ currency_code=d["Local Currency Code"],
+ balance=d["Beginning Balance Local"],
+ )
class ScotiaCashReport(CashReport, custodian="SCOTIA"):
- def yield_rows(self):
- p = self.get_report()
+ @classmethod
+ def yield_rows(cls, knowledge_date, fund):
+ p = cls.get_report(knowledge_date, fund)
df = pd.read_excel(p, skipfooter=1)
if df.empty: # No wires, so we can't skip the footer
df = pd.read_excel(p)
- yield (
- (int(df.loc[0]["Account"]), df.loc[0]["Curr."]),
- df.loc[0]["Closing Bal."],
+ yield df.to_dict(orient="records")[0]
+
+ @classmethod
+ def from_report_line(cls, d):
+ return cls(
+ date=prev_business_day(d["knowledge_date"]),
+ fund=d["fund"],
+ account_name=d["Account"],
+ account_number=d["Account Name"],
+ currency_code="USD",
+ balance=d["Opening Bal."],
)
- def get_report(self):
+ @classmethod
+ def get_report(cls, knowledge_date, fund):
REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports"
try:
return next(
REPORT_DIR.glob(
- f"IsoSelene_{prev_business_day(self.knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1"
+ f"IsoSelene_{prev_business_day(knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1"
)
)
except StopIteration as e:
raise MissingDataError(
- f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}"
+ f"Report not ready {knowledge_date}: {cls.custodian} {fund}"
)
diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py
index b84331cd..baaae19d 100644
--- a/python/report_ops/wires.py
+++ b/python/report_ops/wires.py
@@ -25,7 +25,7 @@ class Report:
def __init_subclass__(cls, table_name: str):
cls._insert_fields = list(filter(cls.is_insert_field, cls.__annotations__))
place_holders = ",".join(["%s"] * len(cls._insert_fields))
- cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls._insert_fields)}) VALUES({place_holders}) ON CONFLICT (unique_ref) DO NOTHING RETURNING *"
+ cls._sql_insert = f"INSERT INTO {table_name}({','.join(cls._insert_fields)}) VALUES({place_holders}) ON CONFLICT DO NOTHING RETURNING *"
cls._insert_queue = []
@classmethod
@@ -37,6 +37,23 @@ class Report:
case _:
return True
+ @classmethod
+ def clear(cls):
+ cls._insert_queue.clear()
+
+ def stage(self):
+ self._insert_queue.append(
+ tuple([getattr(self, col) for col in self._insert_fields])
+ )
+
+ @classmethod
+ def commit(cls):
+ conn = cls._conn
+ with conn.cursor() as c:
+ c.executemany(cls._sql_insert, cls._insert_queue)
+ conn.commit()
+ cls._insert_queue.clear()
+
@dataclass
class WireReport(Report, table_name="custodian_wires"):
@@ -52,7 +69,7 @@ class WireReport(Report, table_name="custodian_wires"):
unique_ref: str
dtkey: ClassVar = field(metadata={"insert": False})
- def __init_subclass__(cls, custodian, dtkey, **kwargs):
+ def __init_subclass__(cls, custodian, dtkey):
cls.custodian = custodian
cls._registry[custodian] = cls
cls.dtkey = dtkey
@@ -68,11 +85,6 @@ class WireReport(Report, table_name="custodian_wires"):
else:
self.amount = float(self.amount)
- def stage(self):
- self._insert_queue.append(
- tuple([getattr(self, col) for col in self._insert_fields])
- )
-
@classmethod
def get_report(cls, date, fund, prefix=None):
report_dir = get_dir(date)
@@ -89,14 +101,6 @@ class WireReport(Report, table_name="custodian_wires"):
)
return p
- @classmethod
- def commit(cls):
- conn = cls._conn
- with conn.cursor() as c:
- c.executemany(cls._sql_insert, cls._insert_queue)
- conn.commit()
- cls._insert_queue.clear()
-
class BNYWireReport(WireReport, custodian="BNY", dtkey="%Y%m%d%H%M%S"):
@classmethod
@@ -147,8 +151,8 @@ class NTWireReport(WireReport, custodian="NT", dtkey="%Y%m%d%H%M"):
if "sponsor" in line["narrative"].lower():
yield line
- @classmethod
- def nt_to_enum(cls, ccy):
+ @staticmethod
+ def nt_to_enum(ccy):
_mapping = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"}
return _mapping[ccy]