diff options
Diffstat (limited to 'python/report_ops')
| -rw-r--r-- | python/report_ops/utils.py | 165 |
1 files changed, 0 insertions, 165 deletions
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py index fd2e7aeb..015f3e07 100644 --- a/python/report_ops/utils.py +++ b/python/report_ops/utils.py @@ -9,11 +9,9 @@ from serenitas.utils.exchange import ExchangeMessage, FileAttachment from serenitas.utils.remote import SftpClient from exchangelib import HTMLBody from tabulate import tabulate -from functools import lru_cache from serenitas.analytics.dates import next_business_day import math import re -from zoneinfo import ZoneInfo from .misc import ( _recipients, _cc_recipients, @@ -30,39 +28,6 @@ def next_business_days(date, offset): return date -def get_file_status(s): - if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): - orig_name, submit_date, status, process_date = m.groups() - else: - raise ValueError(f"Can't parse status from file {s}") - - zone = ZoneInfo("America/New_York") - submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( - tzinfo=zone - ) - process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( - tzinfo=datetime.timezone.utc - ) - if orig_name == ("innocap_serenitas_trades_"): - file_type = "trade" - elif orig_name == "i.innocap_serenitas.": - file_type = "instrument" - else: - raise ValueError(f"error with {s}") - return file_type, "PROCESSED" in s, submit_date, process_date - - -def instrument_table(instrument_id): - if instrument_id.startswith("IRS"): - return "citco_irs" - elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): - return "citco_swaption" - elif instrument_id.startswith("CDS_"): - return "citco_tranche" - elif instrument_id.startswith("TRS"): - return "citco_trs" - - def round_up(n, decimals=0): multiplier = 10**decimals return math.ceil(n * multiplier) / multiplier @@ -127,136 +92,6 @@ def check_cleared_cds(date, fund, conn): @dataclass -class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): - id: int = field(init=False, metadata={"insert": False}) - identifier_type: Literal["trade", "instrument"] - citco_id: str - serenitas_id: str - submit_date: datetime.datetime - process_date: datetime.date - _sftp: ClassVar = field(metadata={"insert": False}) - - @classmethod - @lru_cache(1280) - def process(cls, fname): - file_type, status, submit_date, process_date = get_file_status(fname) - if status: - if file_type == "trade": - key = "Order" - elif file_type == "instrument": - key = "Security" - with cls._sftp.client.open(fname) as fh: - for row in csv.DictReader(fh): - trade = cls( - file_type, - row[f"Internal_{key}_Id"], - row[f"External_{key}_Id"], - submit_date, - process_date, - ) - trade.stage() - else: - with cls._sftp.client.open(fname) as fh: - next(fh) - for row in csv.reader(fh): - id_or_error = row[2] if len(row) > 2 else row[-1] - trade = cls( - "failed", - row[-1], - id_or_error, - submit_date, - process_date, - ) - trade.stage() - - @classmethod - def update_citco_tables(cls, newvals): - d = defaultdict(list) - for row in newvals: - if row.identifier_type == "instrument": - d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) - for table, v in d.items(): - sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" - with cls._conn.cursor() as c: - c.executemany(sql_str, v) - cls._conn.commit() - - @classmethod - def commit(cls): - if not cls._insert_queue: - return - with cls._conn.cursor() as c: - c.executemany(cls._sql_insert, cls._insert_queue, returning=True) - newvals = [] - while True: - if val := c.fetchone(): - newvals.append(val) - if not c.nextset(): - break - cls._conn.commit() - if newvals: - cls.update_citco_tables(newvals) - em = ExchangeMessage() - em.send_email( - "(CITCO) UPLOAD REPORT", - cls._format(newvals), - ( - "fyu@lmcg.com", - "ghorel@lmcg.com", - "etsui@lmcg.com", - ), - ) - - @classmethod - def _format(cls, vals): - t = tabulate( - vals, - headers=[ - "upload_type", - "citco_id", - "serenitas_id", - "submit_date", - "process_date", - ], - tablefmt="unsafehtml", - ) - html = HTMLBody( - f""" - <html> - <head> - <style> - table, th, td {{ border: 1px solid black; border-collapse: collapse;}} - th, td {{ padding: 5px; }} - </style> - </head> - <body> - {t} - </body> - </html> - """ - ) - return html - - @classmethod - def init_sftp(cls): - cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") - - @classmethod - def check_cache(cls): - if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: - if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: - raise ValueError( - "Too many files in the SFTP compared to cache max size" - ) - - -CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( - "RETURNING *", - "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", -) - - -@dataclass class Monitor: date: datetime.date headers: ClassVar = () |
