from collections import defaultdict from dataclasses import field, dataclass import logging from typing import Literal, ClassVar import datetime import csv from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.exchange import ExchangeMessage from serenitas.utils.remote import SftpClient from exchangelib import HTMLBody from tabulate import tabulate from functools import lru_cache from serenitas.analytics.dates import next_business_day from decimal import Decimal import math import re from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) def next_business_days(date, offset): for i in range(offset): date = next_business_day(date) return date def get_file_status(s): if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): orig_name, submit_date, status, process_date = m.groups() else: raise ValueError(f"Can't parse status from file {s}") zone = ZoneInfo("America/New_York") submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( tzinfo=zone ) process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( tzinfo=datetime.timezone.utc ) if orig_name == ("innocap_serenitas_trades_"): file_type = "trade" elif orig_name == "i.innocap_serenitas.": file_type = "instrument" else: raise ValueError(f"error with {s}") return file_type, "PROCESSED" in s, submit_date, process_date def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" def round_up(n, decimals=0): multiplier = 10**decimals return math.ceil(n * multiplier) / multiplier @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): id: int = field(init=False, metadata={"insert": False}) identifier_type: Literal["trade", "instrument"] citco_id: str serenitas_id: str submit_date: datetime.datetime process_date: datetime.date _sftp: ClassVar = field(metadata={"insert": False}) @classmethod @lru_cache(1280) def process(cls, fname): file_type, status, submit_date, process_date = get_file_status(fname) if status: if file_type == "trade": key = "Order" elif file_type == "instrument": key = "Security" with cls._sftp.client.open(fname) as fh: for row in csv.DictReader(fh): trade = cls( file_type, row[f"Internal_{key}_Id"], row[f"External_{key}_Id"], submit_date, process_date, ) trade.stage() else: with cls._sftp.client.open(fname) as fh: next(fh) for row in csv.reader(fh): id_or_error = row[2] if len(row) > 2 else row[-1] trade = cls( "failed", row[-1], id_or_error, submit_date, process_date, ) trade.stage() @classmethod def update_citco_tables(cls, newvals): d = defaultdict(list) for row in newvals: if row.identifier_type == "instrument": d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) for table, v in d.items(): sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" with cls._conn.cursor() as c: c.executemany(sql_str, v) cls._conn.commit() @classmethod def commit(cls): if not cls._insert_queue: return with cls._conn.cursor() as c: c.executemany(cls._sql_insert, cls._insert_queue, returning=True) newvals = [] while True: if val := c.fetchone(): newvals.append(val) if not c.nextset(): break cls._conn.commit() if newvals: cls.update_citco_tables(newvals) em = ExchangeMessage() em.send_email( "(CITCO) UPLOAD REPORT", cls._format(newvals), ( "fyu@lmcg.com", "ghorel@lmcg.com", "etsui@lmcg.com", ), ) @classmethod def _format(cls, vals): t = tabulate( vals, headers=[ "upload_type", "citco_id", "serenitas_id", "submit_date", "process_date", ], tablefmt="unsafehtml", ) html = HTMLBody( f"""
{t} """ ) return html @classmethod def init_sftp(cls): cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") @classmethod def check_cache(cls): if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: raise ValueError( "Too many files in the SFTP compared to cache max size" ) CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( "RETURNING *", "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", ) _recipients = { "ISOSEL": ( "luke.treacy@innocap.com", "margincalls@innocapglobal.com", ), "BOWDST": ( "shkumar@sscinc.com", "mbisoye@sscinc.com", "hedgemark.lmcg.ops@sscinc.com", "hm-operations@bnymellon.com", ), "SERCGMAST": ( "SERENITAS.FA@sscinc.com", "SERENITAS.ops@sscinc.com", ), "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",), "NYOPS": ("nyops@lmcg.com",), } @dataclass class Payment: settle_date: datetime.date currency: str amount: float _insert_queue: ClassVar[list] = [] @classmethod def stage_payment(cls, settlements): for row in settlements: cls._insert_queue.append( cls(row.settle_date, row.currency, row.payment_amount) ) def to_email_format(self): return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}" class PaymentSettlement(Payment): @classmethod def email_innocap(cls, date, account_balance): if not cls._insert_queue: return cls.subtract_cash_balance(account_balance) move_cash = "" for currency in ("USD", "EUR"): biggest_deficit = min( list( map( lambda x: int(x.amount) if x.currency == currency else 0, cls._insert_queue, ) ) ) if biggest_deficit < 0: move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***" em = ExchangeMessage() em.send_email( f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}", "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n" + "\n".join( settlement.to_email_format() for settlement in cls._insert_queue ) + move_cash, to_recipients=_recipients["ISOSEL"], cc_recipients=("Selene-Ops@lmcg.com",), ) cls._insert_queue.clear() @classmethod def stage_payment(cls, settlements, date): for row in settlements: cls._insert_queue.append(cls(date, row.currency, row.payment_amount)) @classmethod def subtract_cash_balance(cls, account_balance): for settlement in cls._insert_queue: settlement.amount = Decimal(account_balance[settlement.currency]) - ( -settlement.amount ) class GFSMonitor(Payment): @classmethod def email_globeop(cls, fund): if not cls._insert_queue: return em = ExchangeMessage() em.send_email( f"GFS Helper Strategy Issue: {fund}", "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n" + "\n".join( settlement.to_email_format() for settlement in cls._insert_queue ), to_recipients=_recipients[fund], cc_recipients=( "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com", ), ) class BamlFcmNotify: @classmethod def email_fcm(cls, date, cash_account, data): em = ExchangeMessage() em.send_email( f"FX Details: {cash_account} {date}", HTMLBody( f""" Hello,