diff options
Diffstat (limited to 'python/report_ops/utils.py')
| -rw-r--r-- | python/report_ops/utils.py | 332 |
1 files changed, 332 insertions, 0 deletions
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py new file mode 100644 index 00000000..36839865 --- /dev/null +++ b/python/report_ops/utils.py @@ -0,0 +1,332 @@ +from collections import defaultdict +from dataclasses import field, dataclass +import logging +from typing import Literal, ClassVar +import datetime +import csv +from serenitas.ops.trade_dataclasses import Deal +from serenitas.utils.exchange import ExchangeMessage +from serenitas.utils.remote import SftpClient +from exchangelib import HTMLBody +from tabulate import tabulate +from functools import lru_cache +from serenitas.analytics.dates import next_business_day +from decimal import Decimal +import math +import re +from zoneinfo import ZoneInfo + +logger = logging.getLogger(__name__) + + +def next_business_days(date, offset): + for i in range(offset): + date = next_business_day(date) + return date + + +def get_file_status(s): + if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): + orig_name, submit_date, status, process_date = m.groups() + else: + raise ValueError(f"Can't parse status from file {s}") + + zone = ZoneInfo("America/New_York") + submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( + tzinfo=zone + ) + process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( + tzinfo=datetime.timezone.utc + ) + if orig_name == ("innocap_serenitas_trades_"): + file_type = "trade" + elif orig_name == "i.innocap_serenitas.": + file_type = "instrument" + else: + raise ValueError(f"error with {s}") + return file_type, "PROCESSED" in s, submit_date, process_date + + +def instrument_table(instrument_id): + if instrument_id.startswith("IRS"): + return "citco_irs" + elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): + return "citco_swaption" + elif instrument_id.startswith("CDS_"): + return "citco_tranche" + elif instrument_id.startswith("TRS"): + return "citco_trs" + + +def round_up(n, decimals=0): + multiplier = 10**decimals + return math.ceil(n * multiplier) / multiplier + + +@dataclass +class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): + id: int = field(init=False, metadata={"insert": False}) + identifier_type: Literal["trade", "instrument"] + citco_id: str + serenitas_id: str + submit_date: datetime.datetime + process_date: datetime.date + _sftp: ClassVar = field(metadata={"insert": False}) + + @classmethod + @lru_cache(1280) + def process(cls, fname): + file_type, status, submit_date, process_date = get_file_status(fname) + if status: + if file_type == "trade": + key = "Order" + elif file_type == "instrument": + key = "Security" + with cls._sftp.client.open(fname) as fh: + for row in csv.DictReader(fh): + trade = cls( + file_type, + row[f"Internal_{key}_Id"], + row[f"External_{key}_Id"], + submit_date, + process_date, + ) + trade.stage() + else: + with cls._sftp.client.open(fname) as fh: + next(fh) + for row in csv.reader(fh): + id_or_error = row[2] if len(row) > 2 else row[-1] + trade = cls( + "failed", + row[-1], + id_or_error, + submit_date, + process_date, + ) + trade.stage() + + @classmethod + def update_citco_tables(cls, newvals): + d = defaultdict(list) + for row in newvals: + if row.identifier_type == "instrument": + d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) + for table, v in d.items(): + sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" + with cls._conn.cursor() as c: + c.executemany(sql_str, v) + cls._conn.commit() + + @classmethod + def commit(cls): + if not cls._insert_queue: + return + with cls._conn.cursor() as c: + c.executemany(cls._sql_insert, cls._insert_queue, returning=True) + newvals = [] + while True: + if val := c.fetchone(): + newvals.append(val) + if not c.nextset(): + break + cls._conn.commit() + if newvals: + cls.update_citco_tables(newvals) + em = ExchangeMessage() + em.send_email( + "(CITCO) UPLOAD REPORT", + cls._format(newvals), + ( + "fyu@lmcg.com", + "ghorel@lmcg.com", + "etsui@lmcg.com", + ), + ) + + @classmethod + def _format(cls, vals): + t = tabulate( + vals, + headers=[ + "upload_type", + "citco_id", + "serenitas_id", + "submit_date", + "process_date", + ], + tablefmt="unsafehtml", + ) + html = HTMLBody( + f""" + <html> + <head> + <style> + table, th, td {{ border: 1px solid black; border-collapse: collapse;}} + th, td {{ padding: 5px; }} + </style> + </head> + <body> + {t} + </body> + </html> + """ + ) + return html + + @classmethod + def init_sftp(cls): + cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") + + @classmethod + def check_cache(cls): + if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: + if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: + raise ValueError( + "Too many files in the SFTP compared to cache max size" + ) + + +CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( + "RETURNING *", + "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", +) + + +_recipients = { + "ISOSEL": ( + "luke.treacy@innocap.com", + "margincalls@innocapglobal.com", + ), + "BOWDST": ( + "shkumar@sscinc.com", + "mbisoye@sscinc.com", + "hedgemark.lmcg.ops@sscinc.com", + "hm-operations@bnymellon.com", + ), + "SERCGMAST": ( + "SERENITAS.FA@sscinc.com", + "SERENITAS.ops@sscinc.com", + ), + "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",), + "NYOPS": ("nyops@lmcg.com",), +} + + +@dataclass +class Payment: + settle_date: datetime.date + currency: str + amount: float + _insert_queue: ClassVar[list] = [] + + @classmethod + def stage_payment(cls, settlements): + for row in settlements: + cls._insert_queue.append( + cls(row.settle_date, row.currency, row.payment_amount) + ) + + def to_email_format(self): + return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}" + + +class PaymentSettlement(Payment): + @classmethod + def email_innocap(cls, date, account_balance): + if not cls._insert_queue: + return + cls.subtract_cash_balance(account_balance) + move_cash = "" + for currency in ("USD", "EUR"): + biggest_deficit = min( + list( + map( + lambda x: int(x.amount) if x.currency == currency else 0, + cls._insert_queue, + ) + ) + ) + if biggest_deficit < 0: + move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***" + em = ExchangeMessage() + em.send_email( + f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}", + "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n" + + "\n".join( + settlement.to_email_format() for settlement in cls._insert_queue + ) + + move_cash, + to_recipients=_recipients["ISOSEL"], + cc_recipients=("Selene-Ops@lmcg.com",), + ) + cls._insert_queue.clear() + + @classmethod + def stage_payment(cls, settlements, date): + for row in settlements: + cls._insert_queue.append(cls(date, row.currency, row.payment_amount)) + + @classmethod + def subtract_cash_balance(cls, account_balance): + for settlement in cls._insert_queue: + settlement.amount = Decimal(account_balance[settlement.currency]) - ( + -settlement.amount + ) + + +class GFSMonitor(Payment): + @classmethod + def email_globeop(cls, fund): + if not cls._insert_queue: + return + em = ExchangeMessage() + em.send_email( + f"GFS Helper Strategy Issue: {fund}", + "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n" + + "\n".join( + settlement.to_email_format() for settlement in cls._insert_queue + ), + to_recipients=_recipients[fund], + cc_recipients=( + "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com", + ), + ) + + +class BamlFcmNotify: + @classmethod + def email_fcm(cls, date, cash_account, data): + em = ExchangeMessage() + em.send_email( + f"FX Details: {cash_account} {date}", + HTMLBody( + f""" +<html> + <head> + <style> + table, th, td {{ border: 1px solid black; border-collapse: collapse;}} + th, td {{ padding: 5px; }} + </style> + </head> + <body> + Hello,<br><br>Please see below details for an FX Spot Trade we did with the desk today for account {cash_account} Please let me know if you need more information.<br><br>{data} + </body> +</html>""" + ), + to_recipients=_recipients["BAML_FCM"], + cc_recipients=("nyops@lmcg.com",), + ) + + +@dataclass +class EmailOps: + _em = ExchangeMessage() + + @classmethod + def email_boston(cls, date): + cls._em.send_email( + f"Missing Cash Balance for Scotia {date}", + f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!", + to_recipients=_recipients["NYOPS"], + ) |
