aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_ops/remote.py
blob: 7a05f90efcff0a9694462f467500f0d6e5d54334 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from serenitas.utils.remote import SftpClient
from dataclasses import field, dataclass
from typing import Callable
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
import datetime
import re
from serenitas.analytics.dates import prev_business_day, next_business_day


def citco_accrued(s):
    if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
        dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
    return prev_business_day(dt)


def citco_all(s):
    if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
        dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
    return dt


def load_citco_report(df, kd):
    df["row"] = df.index
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(" ", "_")
    df["period_end_date"] = kd.date()
    df["knowledge_date"] = next_business_day(kd)
    for column in [
        "init_date",
        "init_settle_date",
        "liqd_date",
        "liqd_settle_date",
        "maturity_date",
        "orig_date",
        "start_date",
        "end_date",
    ]:
        if column in df.columns:
            df[column] = pd.to_datetime(df[column], infer_datetime_format=True).dt.date
    return df


@dataclass
class Report:
    table: str
    ped_func: Callable[[str], datetime.datetime]
    fname: str
    _sftp: SftpClient
    date: datetime.date
    _conn: dbconn = dbconn("dawndb")

    @classmethod
    def set_report(cls, report, date):
        fund, report = report.split("_")
        if (fund, report) == ("isosel", "accrued"):
            return cls(
                "isosel_accrued",
                citco_accrued,
                "100502500_INNOCAP_ISOSEL",
                SftpClient.from_creds("citco", folder="outgoing"),
                date,
            )
        elif (fund, report) == ("isosel", "all"):
            return cls(
                "citco_reports",
                citco_all,
                "SPOS4X_INNOCAP_ISOSEL_D_IM",
                SftpClient.from_creds("citco", folder="outgoing"),
                date,
            )

    @property
    def most_recent_report(self):
        report_files = [
            filename
            for filename in self._sftp.client.listdir()
            if self.fname in filename
            if self.ped_func(filename).date() == self.date
        ]
        return max(report_files, key=self.ped_func)

    def to_df(self):
        with self._sftp.client.open(self.most_recent_report) as fh:
            df = pd.read_csv(fh)
        return load_citco_report(df, self.ped_func(self.most_recent_report))

    def to_db(self):
        with self._conn.cursor() as c:
            c.execute(
                f"DELETE FROM {self.table} WHERE period_end_date= %s",
                (self.date,),
            )
        self._conn.commit()
        df = self.to_df()
        df.to_sql(self.table, dawn_engine, if_exists="append", index=False)