1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
from typing import ClassVar
import datetime
from dataclasses import dataclass, field
import re
import pandas as pd
from serenitas.utils.remote import SftpClient
from serenitas.utils.db import dawn_engine
from sqlalchemy.engine import Engine
from serenitas.analytics.dates import next_business_day, prev_business_day
from serenitas.analytics.exceptions import MissingDataError
_suffix = {"ISOSEL": "IM", "CRSE": "IMCG"}
@dataclass
class CitcoReport:
_engine: ClassVar[Engine] = dawn_engine
_sftp: ClassVar[SftpClient] = SftpClient.from_creds("citco", folder="outgoing")
table: ClassVar
knowledge_date: datetime.date
fund: str
file_pattern: str = field(init=False)
date_cols: ClassVar = []
pattern: ClassVar[str]
_registry: ClassVar = {}
def __init_subclass__(cls, table, date_cols, pattern):
cls.table = table
cls.date_cols = date_cols
cls.pattern = pattern
cls._registry[table] = cls
def __post_init__(self):
self.file_pattern = self.pattern.format(
fund=self.fund, suffix=_suffix[self.fund]
)
def __class_getitem__(cls, table):
return cls._registry[table]
def get_report(self):
reports = [
f
for f in self._sftp.client.listdir()
if f.startswith(self.file_pattern)
and self.get_ts(f).date() == self.knowledge_date
]
return max(
reports,
key=self.get_ts,
default=None,
)
@classmethod
def get_df(cls, report_name):
with cls._sftp.client.open(report_name) as fh:
ts = cls.get_ts(report_name)
df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = ts.date()
df["knowledge_date"] = next_business_day(ts)
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace(
"/M_|/SER_", "/", regex=True
)
return df
def to_db(self):
if report_name := self.get_report():
df = self.get_df(report_name)
df["fund"] = self.fund
with self._engine.connect() as conn:
conn.execute(
f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s",
(
self.knowledge_date,
self.fund,
),
)
df.to_sql(self.table, self._engine, if_exists="append", index=False)
else:
raise MissingDataError(
f"{self.fund}: {self.table} report not ready for {self.knowledge_date}"
)
class AccruedReport(
CitcoReport,
table="isosel_accrued",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
pattern="100502500_INNOCAP_{fund}_",
):
@staticmethod
def get_ts(s):
m = re.search(r"\d{12}", s)
return prev_business_day(datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S"))
class AllReport(
CitcoReport,
table="citco_reports",
date_cols=["Maturity Date"],
pattern="SPOS4X_INNOCAP_{fund}_D_{suffix}",
):
@staticmethod
def get_ts(s):
m = re.search(r"\d{8}\.\d{6}", s)
return datetime.datetime.strptime(m[0], "%Y%m%d.%H%M%S")
|