aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/admin.py
blob: 05f9b87bcb3cb7b137e1750045da495b1a3c4757 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from typing import ClassVar
import datetime
from dataclasses import dataclass, field
import re
import pandas as pd

from serenitas.utils.remote import SftpClient
from serenitas.utils.db import dawn_engine
from sqlalchemy.engine import Engine
from serenitas.analytics.dates import next_business_day, prev_business_day
from serenitas.analytics.exceptions import MissingDataError

_suffix = {"ISOSEL": "IM", "CRSE": "IMCG"}


@dataclass
class CitcoReport:
    _engine: ClassVar[Engine] = dawn_engine
    _sftp: ClassVar[SftpClient] = SftpClient.from_creds("citco", folder="outgoing")
    table: ClassVar
    knowledge_date: datetime.date
    fund: str
    file_pattern: str = field(init=False)
    date_cols: ClassVar = []
    pattern: ClassVar[str]
    _registry: ClassVar = {}

    def __init_subclass__(cls, table, date_cols, pattern):
        cls.table = table
        cls.date_cols = date_cols
        cls.pattern = pattern
        cls._registry[table] = cls

    def __post_init__(self):
        self.file_pattern = self.pattern.format(
            fund=self.fund, suffix=_suffix[self.fund]
        )

    def __class_getitem__(cls, table):
        return cls._registry[table]

    def get_report(self):
        reports = [
            f
            for f in self._sftp.client.listdir()
            if f.startswith(self.file_pattern)
            and self.get_ts(f).date() == self.knowledge_date
        ]
        return max(
            reports,
            key=self.get_ts,
            default=None,
        )

    @classmethod
    def get_df(cls, report_name):
        with cls._sftp.client.open(report_name) as fh:
            ts = cls.get_ts(report_name)
            df = pd.read_csv(fh, parse_dates=cls.date_cols, infer_datetime_format=True)
            df["row"] = df.index
            df.columns = df.columns.str.lower()
            df.columns = df.columns.str.replace(" ", "_")
            df["period_end_date"] = ts.date()
            df["knowledge_date"] = next_business_day(ts)
            if "strategy" in df.columns:
                df["strategy"] = df["strategy"].str.replace(
                    "/M_|/SER_", "/", regex=True
                )
            return df

    def to_db(self):
        if report_name := self.get_report():
            df = self.get_df(report_name)
            df["fund"] = self.fund

            with self._engine.connect() as conn:
                conn.execute(
                    f"DELETE FROM {self.table} WHERE period_end_date=%s AND fund=%s",
                    (
                        self.knowledge_date,
                        self.fund,
                    ),
                )
            df.to_sql(self.table, self._engine, if_exists="append", index=False)
        else:
            raise MissingDataError(
                f"{self.fund}: {self.table} report not ready for {self.knowledge_date}"
            )


class AccruedReport(
    CitcoReport,
    table="isosel_accrued",
    date_cols=[
        "Init Date",
        "Init Settle Date",
        "Liqd Date",
        "Liqd Settle Date",
        "Bond Maturity",
        "Orig Date",
        "Start Date",
        "End Date",
    ],
    pattern="100502500_INNOCAP_{fund}_",
):
    @staticmethod
    def get_ts(s):
        m = re.search(r"\d{12}", s)
        return prev_business_day(datetime.datetime.strptime(m[0], "%Y%m%d%H%M%S"))


class AllReport(
    CitcoReport,
    table="citco_reports",
    date_cols=["Maturity Date"],
    pattern="SPOS4X_INNOCAP_{fund}_D_{suffix}",
):
    @staticmethod
    def get_ts(s):
        m = re.search(r"\d{8}\.\d{6}", s)
        return datetime.datetime.strptime(m[0], "%Y%m%d.%H%M%S")