aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/remote.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops/remote.py')
-rw-r--r--python/report_ops/remote.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/python/report_ops/remote.py b/python/report_ops/remote.py
new file mode 100644
index 00000000..2dac0db2
--- /dev/null
+++ b/python/report_ops/remote.py
@@ -0,0 +1,106 @@
+from serenitas.utils.remote import SftpClient
+from typing import Callable, List
+import pandas as pd
+from serenitas.utils.db import dawn_engine, dbconn
+import datetime
+import re
+from serenitas.analytics.dates import prev_business_day, next_business_day
+
+
+def citco_accrued(s):
+ if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s):
+ dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S")
+ return prev_business_day(dt)
+
+
+def citco_all(s):
+ if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s):
+ dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S")
+ return dt
+
+
+def load_citco_report(fh, kd, date_cols):
+ df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True)
+ df["row"] = df.index
+ df.columns = df.columns.str.lower()
+ df.columns = df.columns.str.replace(" ", "_")
+ df["period_end_date"] = kd.date()
+ df["knowledge_date"] = next_business_day(kd)
+ return df
+
+
+class Report:
+ table: str
+ ped_func: Callable[[str], datetime.datetime]
+ _sftp = SftpClient.from_creds("citco", folder="outgoing")
+ _conn: dbconn = dbconn("dawndb")
+ date_cols: List[str] = []
+
+ def __init_subclass__(cls, table, f, fname, date_cols):
+ cls.table = table
+ cls.ped_func = f
+ cls.fname = fname
+ cls.date_cols = date_cols
+
+ def __init__(self, date):
+ self.date = date
+
+ @property
+ def most_recent_report(self):
+ report_files = [
+ filename
+ for filename in self._sftp.client.listdir()
+ if self.fname in filename
+ if type(self).ped_func(filename).date() == self.date
+ ]
+ try:
+ return max(report_files, key=type(self).ped_func)
+ except ValueError:
+ raise ValueError(f"Missing data for {self.table}: {self.date}")
+
+ def to_df(self):
+ with self._sftp.client.open(self.most_recent_report) as fh:
+ return load_citco_report(
+ fh, type(self).ped_func(self.most_recent_report), self.date_cols
+ )
+
+ def to_db(self):
+ df = self.to_df()
+ with self._conn.cursor() as c:
+ c.execute(
+ f"DELETE FROM {self.table} WHERE period_end_date= %s",
+ (self.date,),
+ )
+ self._conn.commit()
+ if "strategy" in df.columns:
+ df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True)
+ df.to_sql(self.table, dawn_engine, if_exists="append", index=False)
+
+
+class AccruedReport(
+ Report,
+ table="isosel_accrued",
+ f=citco_accrued,
+ fname="100502500_INNOCAP_ISOSEL",
+ date_cols=[
+ "Init Date",
+ "Init Settle Date",
+ "Liqd Date",
+ "Liqd Settle Date",
+ "Bond Maturity",
+ "Orig Date",
+ "Start Date",
+ "End Date",
+ ],
+):
+ pass
+
+
+class AllReport(
+ Report,
+ table="citco_reports",
+ f=citco_all,
+ fname="SPOS4X_INNOCAP_ISOSEL_D_IM",
+ date_cols=["Maturity Date"],
+):
+ pass