aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/remote.py
blob: 2dac0db21aaab3fcc86e18fce845e5be44762de6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from serenitas.utils.remote import SftpClient
from typing import Callable, List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
import datetime
import re
from serenitas.analytics.dates import prev_business_day, next_business_day


def citco_accrued(s):
    if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
        dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
        return prev_business_day(dt)


def citco_all(s):
    if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
        dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
        return dt


def load_citco_report(fh, kd, date_cols):
    df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
    df["row"] = df.index
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(" ", "_")
    df["period_end_date"] = kd.date()
    df["knowledge_date"] = next_business_day(kd)
    return df


class Report:
    table: str
    ped_func: Callable[[str], datetime.datetime]
    _sftp = SftpClient.from_creds("citco", folder="outgoing")
    _conn: dbconn = dbconn("dawndb")
    date_cols: List[str] = []

    def __init_subclass__(cls, table, f, fname, date_cols):
        cls.table = table
        cls.ped_func = f
        cls.fname = fname
        cls.date_cols = date_cols

    def __init__(self, date):
        self.date = date

    @property
    def most_recent_report(self):
        report_files = [
            filename
            for filename in self._sftp.client.listdir()
            if self.fname in filename
            if type(self).ped_func(filename).date() == self.date
        ]
        try:
            return max(report_files, key=type(self).ped_func)
        except ValueError:
            raise ValueError(f"Missing data for {self.table}: {self.date}")

    def to_df(self):
        with self._sftp.client.open(self.most_recent_report) as fh:
            return load_citco_report(
                fh, type(self).ped_func(self.most_recent_report), self.date_cols
            )

    def to_db(self):
        df = self.to_df()
        with self._conn.cursor() as c:
            c.execute(
                f"DELETE FROM {self.table} WHERE period_end_date= %s",
                (self.date,),
            )
        self._conn.commit()
        if "strategy" in df.columns:
            df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
        df.to_sql(self.table, dawn_engine, if_exists="append", index=False)


class AccruedReport(
    Report,
    table="isosel_accrued",
    f=citco_accrued,
    fname="100502500_INNOCAP_ISOSEL",
    date_cols=[
        "Init Date",
        "Init Settle Date",
        "Liqd Date",
        "Liqd Settle Date",
        "Bond Maturity",
        "Orig Date",
        "Start Date",
        "End Date",
    ],
):
    pass


class AllReport(
    Report,
    table="citco_reports",
    f=citco_all,
    fname="SPOS4X_INNOCAP_ISOSEL_D_IM",
    date_cols=["Maturity Date"],
):
    pass