1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
from functools import partial
from typing import ClassVar
import datetime
from dataclasses import dataclass
import pandas as pd
from serenitas.utils.remote import SftpClient
from serenitas.ops.trade_dataclasses import Fund
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from .misc import dt_from_citco
@dataclass
class CitcoReport:
date: datetime.date
fund: Fund
report_name: ClassVar
table: ClassVar
knowledge_date: datetime.date = None
fund: Fund = None
_conn: dbconn = dbconn("dawndb")
date_cols: ClassVar = []
file_prefix: ClassVar
dtkey_fun: ClassVar
_registry = {}
def __init_subclass__(cls, table, file_prefix, date_cols, dtkey):
cls.table = table
cls.file_prefix = file_prefix
cls.date_cols = date_cols
cls.dtkey_fun = partial(dt_from_citco, file_tag=file_prefix, dt_format=dtkey)
cls._registry[table] = cls
def __class_getitem__(cls, table):
return cls._registry[table]
def __post_init__(self):
match self.fund:
case "ISOSEL":
self._sftp = SftpClient.from_creds("citco", folder="outgoing")
if not self.report_file_name:
raise ValueError(f"No reports for {self.table} on {self.date}")
self.knowledge_date = datetime.datetime.combine(
self.date, self.dtkey_fun(self.report_file_name).time()
)
@property
def sftp_report_date(self):
return self.date
@property
def report_file_name(self):
return max(
[
f
for f in self._sftp.client.listdir()
if (self.file_prefix in f)
and (self.dtkey_fun(f).date() == self.sftp_report_date)
],
key=self.dtkey_fun,
default=None,
)
def to_df(self):
with self._sftp.client.open(self.report_file_name) as fh:
df = pd.read_csv(fh, parse_dates=self.date_cols, infer_datetime_format=True)
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = self.date
df["knowledge_date"] = self.knowledge_date
df["fund"] = self.fund
return df
def to_db(self):
df = self.to_df()
with self._conn.cursor() as c:
c.execute(
f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s",
(
self.date,
self.fund,
),
)
self._conn.commit()
if "strategy" in df.columns:
df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
class AccruedReport(
CitcoReport,
table="isosel_accrued",
file_prefix="100502500_INNOCAP_ISOSEL_",
date_cols=[
"Init Date",
"Init Settle Date",
"Liqd Date",
"Liqd Settle Date",
"Bond Maturity",
"Orig Date",
"Start Date",
"End Date",
],
dtkey="%Y%m%d%H%M%S",
):
@property
def sftp_report_date(self):
return next_business_day(self.date)
class AllReport(
CitcoReport,
table="citco_reports",
file_prefix="SPOS4X_INNOCAP_ISOSEL_D_IM.",
date_cols=["Maturity Date"],
dtkey="%Y%m%d.%H%M%S",
):
pass
|