aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/admin.py
blob: 337cc73b09ac8f84b89154ec699e3d7aebfe4846 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from serenitas.utils.remote import SftpClient
from typing import Callable, List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
import datetime
import re
from serenitas.analytics.dates import prev_business_day, next_business_day
from .misc import dt_from_citco
from functools import partial
from typing import ClassVar


def load_citco_report(fh, kd, date_cols):
    df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
    df["row"] = df.index
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(" ", "_")
    df["period_end_date"] = kd.date()
    df["knowledge_date"] = next_business_day(kd)
    return df


class CitcoReport:
    table: str
    _sftp = SftpClient.from_creds("citco", folder="outgoing")
    _conn: dbconn = dbconn("dawndb")
    date_cols: List[str] = []
    dtkey_fun: ClassVar

    def __init_subclass__(cls, table, fname, date_cols, dtkey):
        cls.table = table
        cls.fname = fname
        cls.date_cols = date_cols
        cls.dtkey_fun = partial(dt_from_citco, file_tag=cls.fname, dt_format=dtkey)

    def __init__(self, date):
        self.date = date

    @classmethod
    def get_newest_report(cls, date):
        p = max(
            [
                f
                for f in cls._sftp.client.listdir()
                if (cls.fname in f) and (cls.dtkey_fun(f).date() == date)
            ],
            key=cls.dtkey_fun,
            default=None,
        )
        if not p:
            raise ValueError(f"No reports for {cls.table} on {date}")
        return p

    @classmethod
    def to_df(cls, fname):
        kd = cls.dtkey_fun(fname)
        with cls._sftp.client.open(fname) as fh:
            return load_citco_report(fh, kd, cls.date_cols)

    @classmethod
    def to_db(cls, date):
        p = cls.get_newest_report(date)
        df = cls.to_df(p)
        with cls._conn.cursor() as c:
            c.execute(
                f"DELETE FROM {cls.table} WHERE period_end_date= %s",
                (date,),
            )
        cls._conn.commit()
        if "strategy" in df.columns:
            df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
        df.to_sql(cls.table, dawn_engine, if_exists="append", index=False)


class AccruedReport(
    CitcoReport,
    table="isosel_accrued",
    fname="100502500_INNOCAP_ISOSEL_",
    date_cols=[
        "Init Date",
        "Init Settle Date",
        "Liqd Date",
        "Liqd Settle Date",
        "Bond Maturity",
        "Orig Date",
        "Start Date",
        "End Date",
    ],
    dtkey="%Y%m%d%H%M%S",
):
    pass


class AllReport(
    CitcoReport,
    table="citco_reports",
    fname="SPOS4X_INNOCAP_ISOSEL_D_IM.",
    date_cols=["Maturity Date"],
    dtkey="%Y%m%d.%H%M%S",
):
    pass