1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import datetime
from typing import ClassVar
from dataclasses import dataclass
import re
import pandas as pd
from serenitas.utils.env import DAILY_DIR
from serenitas.utils.db2 import dbconn
from serenitas.analytics.dates import prev_business_day
from serenitas.analytics.exceptions import MissingDataError
from .misc import Custodian, get_dir
@dataclass
class CashReport:
custodian: ClassVar[Custodian]
knowledge_date: datetime.date
fund: str
_conn: ClassVar[dbconn] = dbconn("dawndb")
_staging_queue: ClassVar[set] = set()
_registry = {}
def __init_subclass__(cls, custodian):
cls.custodian = custodian
cls._registry[custodian] = cls
def __class_getitem__(cls, key):
return cls._registry[key]
def get_report(self):
report_dir = get_dir(self.knowledge_date)
reports = [
f
for f in report_dir.iterdir()
if f.name.startswith(self.pattern)
and self.get_ts(f.name).date() == self.knowledge_date
]
p = max(
reports,
key=lambda f: self.get_ts(f.name),
default=None,
)
if p:
return p
else:
raise MissingDataError(
f"Report not ready {self.knowledge_date}: {self.custodian} {self.fund}"
)
@property
def pattern(self):
return f"{self.custodian}_CASH_{self.fund}_"
@staticmethod
def get_ts(s):
m = re.search(r"\d{12}", s)
return datetime.datetime.strptime(m[0], "%Y%m%d%H%M")
@classmethod
def commit(cls):
sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
with cls._conn.cursor() as c:
c.executemany(sql, cls._staging_queue)
cls._conn.commit()
@classmethod
def clear(cls):
cls._staging_queue.clear()
def stage(self, row):
(account, currency), amount = row
self._staging_queue.add(
(
prev_business_day(self.knowledge_date),
self.fund,
f"{self.custodian} Account {self.fund}",
account,
currency,
amount,
)
)
def to_db(self):
for row in self.yield_rows():
self.stage(row)
self.commit()
self.clear()
class NTCashReport(CashReport, custodian="NT"):
def yield_rows(self):
df = pd.read_csv(self.get_report(), on_bad_lines="warn")
df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"]
df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]]
df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower()
df = df.set_index(["consolidation", "currencycode"])
yield from df.itertuples()
class UMBCashReport(CashReport, custodian="UMB"):
def yield_rows(self):
df = pd.read_excel(self.get_report(), skiprows=3)
yield from df.groupby(["Portfolio #", "Currency"]).sum(numeric_only=True)[
"Current Balance"
].items()
class BNYCashReport(CashReport, custodian="BNY"):
@staticmethod
def get_ts(s):
m = re.search(r"\d{14}", s)
return datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S")
def yield_rows(self):
df = pd.read_csv(self.get_report())
df["Beginning Balance Local"] = df["Beginning Balance Local"].apply(
lambda s: "-" + s[1:-1] if s.startswith("(") else s
)
df["Beginning Balance Local"] = pd.to_numeric(
df["Beginning Balance Local"].str.replace(",", "")
)
yield from df.groupby(["Account Number", "Local Currency Code"]).sum(
numeric_only=True
)["Beginning Balance Local"].items()
class ScotiaCashReport(CashReport, custodian="SCOTIA"):
def yield_rows(self):
p = self.get_report()
df = pd.read_excel(p, skipfooter=1)
if df.empty: # No wires, so we can't skip the footer
df = pd.read_excel(p)
yield (
(int(df.loc[0]["Account"]), df.loc[0]["Curr."]),
df.loc[0]["Closing Bal."],
)
def get_report(self):
REPORT_DIR = DAILY_DIR / "Selene" / "Scotia_reports"
try:
return next(
REPORT_DIR.glob(
f"IsoSelene_{prev_business_day(self.knowledge_date):%d-%b-%Y}_*_xlsx.JOAAPKO3.JOAAPKO1"
)
)
except StopIteration as e:
return
|