1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
from serenitas.analytics.dates import prev_business_day
import datetime
import pandas as pd
from serenitas.utils.db import dbconn
from typing import ClassVar
from .misc import get_dir, dt_from_fname
from .custodians import NT, UMB, BNY
from functools import partial
from dataclasses import dataclass
@dataclass
class CashReport:
fund: ClassVar[str]
custodian: ClassVar[str]
date: datetime.date
dtkey: ClassVar
_conn: ClassVar[dbconn] = dbconn("dawndb")
_staging_queue: ClassVar[set] = set()
_insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
_registry = {}
def __init_subclass__(cls, fund, custodian, dtkey):
cls.fund = fund
cls.custodian = custodian
cls.dtkey = dtkey
cls._registry[
(
fund,
custodian,
)
] = cls
def __class_getitem__(cls, key):
return cls._registry[key]
def get_cash_report(self, report_prefix):
self.download_reports(self.date)
report_dir = get_dir(self.date)
report_dir.mkdir(exist_ok=True, parents=True)
p = max(
[
f
for f in get_dir(self.date).iterdir()
if f.name.startswith(report_prefix)
],
key=partial(dt_from_fname, dt_format=self.dtkey),
default=None,
)
if not p:
raise ValueError(
f"No reports found for fund: {self.fund} date: {self.date}"
)
return p
@classmethod
def commit(cls):
with cls._conn.cursor() as c:
c.executemany(cls._insert_sql, cls._staging_queue)
cls._conn.commit()
def stage_from_row(self, row):
(account, currency), amount = row
self._staging_queue.add(
(
prev_business_day(self.date),
self.fund,
f"{self.custodian} Custody Account {self.fund}",
account,
currency,
amount,
)
)
class SeleneNTCashReport(
CashReport, NT, fund="ISOSEL", custodian="NT", dtkey="%Y%m%d%H%M"
):
def to_db(self):
p = self.get_cash_report("cash_")
df = pd.read_csv(p, on_bad_lines="warn")
df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"]
df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]]
df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower()
df = df.set_index(["consolidation", "currencycode"])
for row in df.itertuples():
self.stage_from_row(row)
self.commit()
self._staging_queue.clear()
class SerenitasUMBCashReport(
CashReport, UMB, fund="SERCGMAST", custodian="UMB", dtkey="%Y%m%d%H%M"
):
def to_db(self):
p = self.get_cash_report("umb_")
df = pd.read_excel(p, skiprows=3)
for row in (
df.groupby(["Portfolio #", "Currency"]).sum()["Current Balance"].items()
):
self.stage_from_row(row)
self.commit()
self._staging_queue.clear()
class BowdstBNYCashReport(
CashReport, BNY, fund="BOWDST", custodian="BNY", dtkey="%Y%m%d%H%M%S"
):
def to_db(self):
p = self.get_cash_report("Live-cash")
df = pd.read_csv(p)
df["Beginning Balance Local"] = df["Beginning Balance Local"].apply(
lambda s: "-" + s[1:-1] if s.startswith("(") else s
)
df["Beginning Balance Local"] = pd.to_numeric(
df["Beginning Balance Local"].str.replace(",", "")
)
for row in (
df.groupby(["Account Number", "Local Currency Code"])
.sum()["Beginning Balance Local"]
.items()
):
self.stage_from_row(row)
self.commit()
self._staging_queue.clear()
|