aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/admin.py
blob: 1f82afdb34d3cbe1554f3cbb5b036d1fb6739763 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from serenitas.utils.remote import SftpClient
from typing import List
import pandas as pd
from serenitas.utils.db import dawn_engine, dbconn
from serenitas.analytics.dates import next_business_day
from .misc import dt_from_citco
from functools import partial
from typing import ClassVar
import datetime
from dataclasses import dataclass
from serenitas.ops.trade_dataclasses import Fund


@dataclass
class CitcoReport:
    date: datetime.date
    fund: Fund
    report_name: ClassVar
    table: ClassVar
    knowledge_date: datetime.date = None
    fund: Fund = None
    _conn: dbconn = dbconn("dawndb")
    date_cols: ClassVar = []
    dtkey_fun: ClassVar
    _registry = {}

    def __init_subclass__(cls, table, fname, date_cols, dtkey):
        cls.table = table
        cls.fname = fname
        cls.date_cols = date_cols
        cls.dtkey_fun = partial(dt_from_citco, file_tag=cls.fname, dt_format=dtkey)
        cls._registry[table] = cls

    def __class_getitem__(cls, table):
        return cls._registry[table]

    def __post_init__(self):
        match self.fund:
            case "ISOSEL":
                self._sftp = SftpClient.from_creds("citco", folder="outgoing")
        self.report_name = self.get_latest_report()
        self.knowledge_date = datetime.datetime.combine(
            self.date, self.dtkey_fun(self.report_name).time()
        )

    @property
    def report_date(self):
        return self.date

    def get_latest_report(self):
        p = max(
            [
                f
                for f in self._sftp.client.listdir()
                if (self.fname in f) and (self.dtkey_fun(f).date() == self.report_date)
            ],
            key=self.dtkey_fun,
            default=None,
        )
        if not p:
            raise ValueError(f"No reports for {self.table} on {self.date}")
        return p

    def to_df(self):
        with self._sftp.client.open(self.report_name) as fh:
            df = pd.read_csv(fh, parse_dates=self.date_cols, infer_datetime_format=True)
            df["row"] = df.index
            df.columns = df.columns.str.lower()
            df.columns = df.columns.str.replace(" ", "_")
            df["period_end_date"] = self.date
            df["knowledge_date"] = self.knowledge_date
            return df

    def to_db(self):
        df = self.to_df()
        with self._conn.cursor() as c:
            c.execute(
                f"DELETE FROM {self.table} WHERE period_end_date= %s",
                (self.date,),
            )
        self._conn.commit()
        if "strategy" in df.columns:
            df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
        df.to_sql(self.table, dawn_engine, if_exists="append", index=False)


class AccruedReport(
    CitcoReport,
    table="isosel_accrued",
    fname="100502500_INNOCAP_ISOSEL_",
    date_cols=[
        "Init Date",
        "Init Settle Date",
        "Liqd Date",
        "Liqd Settle Date",
        "Bond Maturity",
        "Orig Date",
        "Start Date",
        "End Date",
    ],
    dtkey="%Y%m%d%H%M%S",
):
    @property
    def report_date(self):
        return next_business_day(self.date)


class AllReport(
    CitcoReport,
    table="citco_reports",
    fname="SPOS4X_INNOCAP_ISOSEL_D_IM.",
    date_cols=["Maturity Date"],
    dtkey="%Y%m%d.%H%M%S",
):
    pass