1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
from serenitas.utils.remote import SftpClient
from typing import Callable, List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
import datetime
import re
from serenitas.analytics.dates import prev_business_day, next_business_day
def citco_accrued(s):
if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
return prev_business_day(dt)
def citco_all(s):
if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
return dt
def load_citco_report(fh, kd, date_cols):
df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = kd.date()
df["knowledge_date"] = next_business_day(kd)
return df
class Report:
table: str
ped_func: Callable[[str], datetime.datetime]
_sftp = SftpClient.from_creds("citco", folder="outgoing")
_conn: dbconn = dbconn("dawndb")
date_cols: List[str] = []
def __init_subclass__(cls, table, f, fname, date_cols):
cls.table = table
cls.ped_func = f
cls.fname = fname
cls.date_cols = date_cols
def __init__(self, date):
self.date = date
@property
def most_recent_report(self):
report_files = [
filename
for filename in self._sftp.client.listdir()
if self.fname in filename
if type(self).ped_func(filename).date() == self.date
]
try:
return max(report_files, key=type(self).ped_func)
except ValueError:
raise ValueError(f"Missing data for {self.table}: {self.date}")
def to_df(self):
with self._sftp.client.open(self.most_recent_report) as fh:
return load_citco_report(
fh, type(self).ped_func(self.most_recent_report), self.date_cols
)
def to_db(self):
df = self.to_df()
with self._conn.cursor() as c:
c.execute(
f"DELETE FROM {self.table} WHERE period_end_date= %s",
(self.date,),
)
self._conn.commit()
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
class AccruedReport(
Report,
table="isosel_accrued",
f=citco_accrued,
fname="100502500_INNOCAP_ISOSEL",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
):
pass
class AllReport(
Report,
table="citco_reports",
f=citco_all,
fname="SPOS4X_INNOCAP_ISOSEL_D_IM",
date_cols=["Maturity Date"],
):
pass
|