1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
from serenitas.utils.remote import SftpClient
from typing import List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from .misc import dt_from_citco
from functools import partial
from typing import ClassVar
def load_citco_report(fh, kd, date_cols):
df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = kd.date()
df["knowledge_date"] = next_business_day(kd)
return df
class CitcoReport:
table: str
_sftp = SftpClient.from_creds("citco", folder="outgoing")
_conn: dbconn = dbconn("dawndb")
date_cols: List[str] = []
dtkey_fun: ClassVar
_registry = {}
def __init_subclass__(cls, table, fname, date_cols, dtkey):
cls.table = table
cls.fname = fname
cls.date_cols = date_cols
cls.dtkey_fun = partial(dt_from_citco, file_tag=cls.fname, dt_format=dtkey)
cls._registry[table] = cls
def __init__(self, date):
self.date = date
def __class_getitem__(cls, table):
return cls._registry[table]
@classmethod
def get_newest_report(cls, date):
p = max(
[
f
for f in cls._sftp.client.listdir()
if (cls.fname in f) and (cls.dtkey_fun(f).date() == date)
],
key=cls.dtkey_fun,
default=None,
)
if not p:
raise ValueError(f"No reports for {cls.table} on {date}")
return p
@classmethod
def to_df(cls, fname):
kd = cls.dtkey_fun(fname)
with cls._sftp.client.open(fname) as fh:
return load_citco_report(fh, kd, cls.date_cols)
@classmethod
def to_db(cls, date):
p = cls.get_newest_report(date)
df = cls.to_df(p)
with cls._conn.cursor() as c:
c.execute(
f"DELETE FROM {cls.table} WHERE period_end_date= %s",
(date,),
)
cls._conn.commit()
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
df.to_sql(cls.table, dawn_engine, if_exists="append", index=False)
class AccruedReport(
CitcoReport,
table="isosel_accrued",
fname="100502500_INNOCAP_ISOSEL_",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
dtkey="%Y%m%d%H%M%S",
):
pass
class AllReport(
CitcoReport,
table="citco_reports",
fname="SPOS4X_INNOCAP_ISOSEL_D_IM.",
date_cols=["Maturity Date"],
dtkey="%Y%m%d.%H%M%S",
):
pass
|