aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/admin.py
blob: 01a8929b1de01a73669979ad0f95c994f9ec30f9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from serenitas.utils.remote import SftpClient
from typing import List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from .misc import dt_from_citco
from functools import partial
from typing import ClassVar
import datetime


def load_citco_report(fh, kd, date_cols):
    df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
    df["row"] = df.index
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(" ", "_")
    df["period_end_date"] = kd.date()
    df["knowledge_date"] = next_business_day(kd)
    return df


class CitcoReport:
    table: str
    _sftp = SftpClient.from_creds("citco", folder="outgoing")
    _conn: dbconn = dbconn("dawndb")
    date_cols: List[str] = []
    dtkey_fun: ClassVar
    _registry = {}

    def __init_subclass__(cls, table, fname, date_cols, dtkey):
        cls.table = table
        cls.fname = fname
        cls.date_cols = date_cols
        cls.dtkey_fun = partial(dt_from_citco, file_tag=cls.fname, dt_format=dtkey)
        cls._registry[table] = cls

    def __init__(self, date):
        self.date = date

    def __class_getitem__(cls, table):
        return cls._registry[table]

    @classmethod
    def get_newest_report(cls, report_file_date):
        p = max(
            [
                f
                for f in cls._sftp.client.listdir()
                if (cls.fname in f) and (cls.dtkey_fun(f).date() == report_file_date)
            ],
            key=cls.dtkey_fun,
            default=None,
        )
        if not p:
            raise ValueError(f"No reports for {cls.table} on {report_file_date}")
        return p

    @classmethod
    def to_df(cls, fname):
        kd = cls.dtkey_fun(fname)
        with cls._sftp.client.open(fname) as fh:
            return load_citco_report(fh, kd, cls.date_cols)

    @classmethod
    def to_db(cls, ped):
        report_file_date = ped if cls is AllReport else next_business_day(ped)
        p = cls.get_newest_report(report_file_date)
        df = cls.to_df(p)
        df["knowledge_date"] = datetime.datetime.combine(
            next_business_day(ped), cls.dtkey_fun(p).time()
        )
        df["period_end_date"] = ped
        with cls._conn.cursor() as c:
            c.execute(
                f"DELETE FROM {cls.table} WHERE period_end_date= %s",
                (ped,),
            )
        cls._conn.commit()
        if "strategy" in df.columns:
            df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
        df.to_sql(cls.table, dawn_engine, if_exists="append", index=False)


class AccruedReport(
    CitcoReport,
    table="isosel_accrued",
    fname="100502500_INNOCAP_ISOSEL_",
    date_cols=[
        "Init Date",
        "Init Settle Date",
        "Liqd Date",
        "Liqd Settle Date",
        "Bond Maturity",
        "Orig Date",
        "Start Date",
        "End Date",
    ],
    dtkey="%Y%m%d%H%M%S",
):
    pass


class AllReport(
    CitcoReport,
    table="citco_reports",
    fname="SPOS4X_INNOCAP_ISOSEL_D_IM.",
    date_cols=["Maturity Date"],
    dtkey="%Y%m%d.%H%M%S",
):
    pass