1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
from serenitas.utils.remote import SftpClient
from dataclasses import field, dataclass
from typing import Callable
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
import datetime
import re
from serenitas.analytics.dates import prev_business_day, next_business_day
def citco_accrued(s):
if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
return prev_business_day(dt)
def citco_all(s):
if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
return dt
def load_citco_report(df, kd):
df["row"] = df.index
df.columns = df.columns.str.lower()
df.columns = df.columns.str.replace(" ", "_")
df["period_end_date"] = kd.date()
df["knowledge_date"] = next_business_day(kd)
for column in [
"init_date",
"init_settle_date",
"liqd_date",
"liqd_settle_date",
"maturity_date",
"orig_date",
"start_date",
"end_date",
]:
if column in df.columns:
df[column] = pd.to_datetime(df[column], infer_datetime_format=True).dt.date
return df
@dataclass
class Report:
table: str
ped_func: Callable[[str], datetime.datetime]
fname: str
_sftp: SftpClient
date: datetime.date
_conn: dbconn = dbconn("dawndb")
@classmethod
def set_report(cls, report, date):
fund, report = report.split("_")
if (fund, report) == ("isosel", "accrued"):
return cls(
"isosel_accrued",
citco_accrued,
"100502500_INNOCAP_ISOSEL",
SftpClient.from_creds("citco", folder="outgoing"),
date,
)
elif (fund, report) == ("isosel", "all"):
return cls(
"citco_reports",
citco_all,
"SPOS4X_INNOCAP_ISOSEL_D_IM",
SftpClient.from_creds("citco", folder="outgoing"),
date,
)
@property
def most_recent_report(self):
report_files = [
filename
for filename in self._sftp.client.listdir()
if self.fname in filename
if self.ped_func(filename).date() == self.date
]
return max(report_files, key=self.ped_func)
def to_df(self):
with self._sftp.client.open(self.most_recent_report) as fh:
df = pd.read_csv(fh)
return load_citco_report(df, self.ped_func(self.most_recent_report))
def to_db(self):
with self._conn.cursor() as c:
c.execute(
f"DELETE FROM {self.table} WHERE period_end_date= %s",
(self.date,),
)
self._conn.commit()
df = self.to_df()
df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
|