1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
from functools import partial
from typing import ClassVar
import datetime
from dataclasses import dataclass
import pandas as pd
from serenitas.utils.remote import SftpClient
from serenitas.ops.trade_dataclasses import Fund
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from serenitas.analytics.exceptions import MissingDataError
from .misc import dt_from_citco
@dataclass
class CitcoReport:
date: datetime.date
fund: Fund
report_name: ClassVar
table: ClassVar
knowledge_date: datetime.date = None
fund: Fund = None
_conn: dbconn = dbconn("dawndb")
date_cols: ClassVar = []
dtkey: ClassVar
_registry = {}
def __init_subclass__(cls, table, date_cols, dtkey):
cls.table = table
cls.date_cols = date_cols
cls.dtkey = dtkey
cls._registry[table] = cls
def __class_getitem__(cls, table):
return cls._registry[table]
def __post_init__(self):
self._sftp = SftpClient.from_creds("citco", folder="outgoing")
if not self.report_file_name:
raise MissingDataError(
f"No reports for {self.fund}:{self.table} on {self.date}"
)
self.knowledge_date = datetime.datetime.combine(
next_business_day(self.date), self.dtkey_fun(self.report_file_name).time()
)
@property
def dtkey_fun(self):
return partial(dt_from_citco, file_tag=self.file_prefix, dt_format=self.dtkey)
@property
def sftp_report_date(self):
return self.date
@property
def report_file_name(self):
return max(
[
f
for f in self._sftp.client.listdir()
if (self.file_prefix in f)
and (self.dtkey_fun(f).date() == self.sftp_report_date)
],
key=self.dtkey_fun,
default=None,
)
def to_df(self):
with self._sftp.client.open(self.report_file_name) as fh:
df = pd.read_csv(fh, parse_dates=self.date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = self.date
df["knowledge_date"] = self.knowledge_date
df["fund"] = self.fund
return df
def to_db(self):
df = self.to_df()
with self._conn.cursor() as c:
c.execute(
f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s",
(
self.date,
self.fund,
),
)
self._conn.commit()
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
class AccruedReport(
CitcoReport,
table="isosel_accrued",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
dtkey="%Y%m%d%H%M%S",
):
@property
def sftp_report_date(self):
return next_business_day(self.date)
@property
def file_prefix(self):
return f"100502500_INNOCAP_{self.fund}_"
class AllReport(
CitcoReport,
table="citco_reports",
date_cols=["Maturity Date"],
dtkey="%Y%m%d.%H%M%S",
):
@property
def file_prefix(self):
return (
f"SPOS4X_INNOCAP_{self.fund}_D_{'IM' if self.fund =='ISOSEL' else 'IMCG'}."
)
|