1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
from functools import partial
from typing import ClassVar
import datetime
from dataclasses import dataclass
import pandas as pd
import dateutil.parser as dparser
from serenitas.utils.remote import SftpClient
from serenitas.ops.trade_dataclasses import Fund
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from serenitas.analytics.exceptions import MissingDataError
from .misc import dt_from_citco
@dataclass
class CitcoReport:
report_name: ClassVar
table: ClassVar
knowledge_date: datetime.date = None
_conn: dbconn = dbconn("dawndb")
_sftp: ClassVar = SftpClient.from_creds("citco", folder="outgoing")
date_cols: ClassVar = []
dtkey: ClassVar
_registry = {}
def __init_subclass__(cls, table, date_cols, dtkey):
cls.table = table
cls.date_cols = date_cols
cls.dtkey = dtkey
cls._registry[table] = cls
def __class_getitem__(cls, table):
return cls._registry[table]
def __post_init__(self):
if not self.report_file_name:
raise MissingDataError(
f"No reports for {self.fund}:{self.table} on {self.date}"
)
self.knowledge_date = datetime.datetime.combine(
next_business_day(self.date), self.dtkey_fun(self.report_file_name).time()
)
@staticmethod
def get_report_date(cob):
return cob
@classmethod
def get_report(cls, date, fund):
prefix = cls.get_prefix(fund)
dtkey_fun = cls.get_dt_fun(fund)
report_date = cls.get_report_date(date)
return max(
[
f
for f in cls._sftp.client.listdir()
if (prefix in f) and (dtkey_fun(f).date() == report_date)
],
key=dtkey_fun,
)
@classmethod
def get_df(cls, report_name):
with cls._sftp.client.open(report_name) as fh:
df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = cls.date
df["knowledge_date"] = cls.knowledge_date
df["fund"] = cls.fund
return df
@classmethod
def to_db(cls, date, fund):
report_name = cls.get_report(date, fund)
df = cls.get_df(report_name)
with cls._conn.cursor() as c:
c.execute(
f"DELETE FROM {cls.table} WHERE period_end_date=%s AND fund=%s",
(
date,
fund,
),
)
cls._conn.commit()
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
@classmethod
def get_dt_fun(cls, fund):
def dt_fun(fname, date):
ts = fname.removeprefix(cls.get_prefix(fund)).removesuffix(".csv")
return dparser(ts)
return dt_fun
class AccruedReport(
CitcoReport,
table="isosel_accrued",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
dtkey="%Y%m%d%H%M%S",
):
@staticmethod
def get_report_date(cob):
return next_business_day(cob)
@staticmethod
def get_prefix(fund):
return f"100502500_INNOCAP_{fund}_"
class AllReport(
CitcoReport,
table="citco_reports",
date_cols=["Maturity Date"],
dtkey="%Y%m%d.%H%M%S",
):
@staticmethod
def get_prefix(fund):
return f"SPOS4X_INNOCAP_{fund}_D_{'IM' if fund =='ISOSEL' else 'IMCG'}."
|