1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
from typing import ClassVar
import datetime
from dataclasses import dataclass
import re
import pandas as pd
from serenitas.utils.remote import SftpClient
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day, prev_business_day
from serenitas.analytics.exceptions import MissingDataError
_suffix = {"ISOSEL": "IM", "CRSE": "IMCG"}
@dataclass
class CitcoReport:
report_name: ClassVar
table: ClassVar
knowledge_date: datetime.date = None
_conn: dbconn = dbconn("dawndb")
_sftp: ClassVar = SftpClient.from_creds("citco", folder="outgoing")
date_cols: ClassVar = []
prefix: ClassVar
_registry = {}
def __init_subclass__(cls, table, date_cols, prefix):
cls.table = table
cls.date_cols = date_cols
cls.prefix = prefix
cls._registry[table] = cls
def __class_getitem__(cls, table):
return cls._registry[table]
@staticmethod
def get_report_date(cob):
return cob
@classmethod
def get_report(cls, date, fund):
reports = [
f
for f in cls._sftp.client.listdir()
if (cls.prefix.format(fund=fund, suffix=_suffix[fund]) in f)
and cls.get_ts(f).date() == date
]
return max(
reports,
key=cls.get_ts,
default=None,
)
@classmethod
def get_df(cls, report_name):
with cls._sftp.client.open(report_name) as fh:
ts = cls.get_ts(report_name)
df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = ts.date()
df["knowledge_date"] = next_business_day(ts)
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace(
"/M_|/SER_", "/", regex=True
)
return df
@classmethod
def to_db(cls, date, fund):
if report_name := cls.get_report(date, fund):
conn = cls._conn
df = cls.get_df(report_name)
df["fund"] = fund
with conn.cursor() as c:
c.execute(
f"DELETE FROM {cls.table} WHERE period_end_date=%s AND fund=%s",
(
date,
fund,
),
)
conn.commit()
df.to_sql(cls.table, dawn_engine, if_exists="append", index=False)
else:
raise MissingDataError(f"{fund}: {cls.table} report not ready for {date}")
class AccruedReport(
CitcoReport,
table="isosel_accrued",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
prefix="100502500_INNOCAP_{fund}_",
):
@staticmethod
def get_ts(s):
m = re.search(r"\d{8}\d{4}", s)
return prev_business_day(datetime.datetime.strptime(m.group(), "%Y%m%d%H%M%S"))
class AllReport(
CitcoReport,
table="citco_reports",
date_cols=["Maturity Date"],
prefix="SPOS4X_INNOCAP_{fund}_D_{suffix}",
):
@staticmethod
def get_ts(s):
m = re.search(r"\d{8}\.\d{6}", s)
return datetime.datetime.strptime(m.group(), "%Y%m%d.%H%M%S")
|