from dataclasses import field, dataclass import logging from typing import Literal, ClassVar import datetime import csv from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.exchange import ExchangeMessage from serenitas.utils.remote import SftpClient from psycopg.errors import UniqueViolation from exchangelib import HTMLBody from tabulate import tabulate from functools import lru_cache from serenitas.analytics.dates import next_business_day from decimal import Decimal import math logger = logging.getLogger(__name__) def next_business_days(date, offset): for i in range(offset): date = next_business_day(date) return date def get_file_status(s): is_processed, fname_short = s.rsplit("_", 1) is_processed = is_processed.rsplit("-")[1] == "PROCESSED" fname_short = fname_short.removesuffix(".csv") return is_processed, fname_short def get_success_data(line): if line[2]: # This is a trade identifier_type = "trade" serenitas_id = line[5] identifier = line[2] else: identifier_type = "instrument" serenitas_id = line[4] identifier = line[1] return identifier_type, serenitas_id, identifier def get_failed_data(line): if len(line) == 1: return ("failed", line[-1], line[-1]) elif line[1]: # Trade upload return ("trade", line[2], line[-1]) elif ( not line[1] and line[2] ): # Instrument upload, just mark as failed if it's a single error message return ("instrument", line[2], line[-1]) else: return ("failed", line[-1], line[-1]) def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" def round_up(n, decimals=0): multiplier = 10**decimals return math.ceil(n * multiplier) / multiplier @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"): fname: str identifier_type: Literal["trade", "instrument"] identifier: str serenitas_id: str submit_date: datetime.datetime = field(default=datetime.datetime.now()) processed: bool = field(default=False) _sftp: ClassVar = field(metadata={"insert": False}) @classmethod def from_citco_line(cls, line, fname): is_processed, fname_short = get_file_status(fname) if is_processed: identifier_type, serenitas_id, identifier = get_success_data(line) else: serenitas_id = "failed" (identifier_type, serenitas_id, identifier) = get_failed_data(line) return cls( fname=fname_short, identifier_type=identifier_type, identifier=identifier, serenitas_id=serenitas_id, processed=is_processed, ) @classmethod @lru_cache(1280) def process(cls, fname): with cls._sftp.client.open(fname) as fh: next(fh) # skip header for row in csv.reader(fh): trade = cls.from_citco_line(row, fname) trade.stage() return @classmethod def update_citco_tables(cls): with cls._conn.cursor() as c: for row in cls._insert_queue: if row[1] == "instrument": serenitas_id = row[3] sql_str = f"UPDATE {instrument_table(serenitas_id)} SET committed=True, status='Acknowledged' WHERE dealid=%s" c.execute( sql_str, (serenitas_id,), ) cls._conn.commit() @classmethod def commit(cls): if not cls._insert_queue: return with cls._conn.cursor() as c: try: c.executemany(cls._sql_insert, cls._insert_queue) except UniqueViolation as e: logger.warning(e) cls._conn.rollback() else: cls._conn.commit() cls.update_citco_tables() em = ExchangeMessage() em.send_email( f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][5] else 'FAILED'}", cls._format(), ( "fyu@lmcg.com", "ghorel@lmcg.com", "etsui@lmcg.com", ), ) finally: cls._insert_queue.clear() @classmethod def _format(cls): t = tabulate( [upload for upload in cls._insert_queue], headers=[ "file_name", "upload_type", "citco_id", "serenitas_id", "commit_time", "processed", ], tablefmt="unsafehtml", ) html = HTMLBody( f""" {t} """ ) return html @classmethod def init_sftp(cls): cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") @classmethod def check_cache(cls): if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: raise ValueError( "Too many files in the SFTP compared to cache max size" ) _recipients = { "ISOSEL": ( "luke.treacy@innocap.com", "margincalls@innocapglobal.com", ), "BOWDST": ( "shkumar@sscinc.com", "hedgemark.lmcg.ops@sscinc.com", "hm-operations@bnymellon.com", ), "SERCGMAST": ( "SERENITAS.FA@sscinc.com", "SERENITAS.ops@sscinc.com", ), "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",), "NYOPS": ("nyops@lmcg.com",), } @dataclass class Payment: settle_date: datetime.date currency: str amount: float _insert_queue: ClassVar[list] = [] @classmethod def stage_payment(cls, settlements): for row in settlements: cls._insert_queue.append( cls(row.settle_date, row.currency, row.payment_amount) ) def to_email_format(self): return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}" class PaymentSettlement(Payment): @classmethod def email_innocap(cls, date, account_balance): if not cls._insert_queue: return cls.subtract_cash_balance(account_balance) move_cash = "" for currency in ("USD", "EUR"): biggest_deficit = min( list( map( lambda x: int(x.amount) if x.currency == currency else 0, cls._insert_queue, ) ) ) if biggest_deficit < 0: move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***" em = ExchangeMessage() em.send_email( f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}", "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n" + "\n".join( settlement.to_email_format() for settlement in cls._insert_queue ) + move_cash, to_recipients=_recipients["ISOSEL"], cc_recipients=("Selene-Ops@lmcg.com",), ) cls._insert_queue.clear() @classmethod def stage_payment(cls, settlements, date): for row in settlements: cls._insert_queue.append(cls(date, row.currency, row.payment_amount)) @classmethod def subtract_cash_balance(cls, account_balance): for settlement in cls._insert_queue: settlement.amount = Decimal(account_balance[settlement.currency]) - ( -settlement.amount ) class GFSMonitor(Payment): @classmethod def email_globeop(cls, fund): if not cls._insert_queue: return em = ExchangeMessage() em.send_email( f"GFS Helper Strategy Issue: {fund}", "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n" + "\n".join( settlement.to_email_format() for settlement in cls._insert_queue ), to_recipients=_recipients[fund], cc_recipients=( "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com", ), ) class BamlFcmNotify: @classmethod def email_fcm(cls, date, data): em = ExchangeMessage() em.send_email( f"FX Details: 6MZ20049 {date}", HTMLBody( f""" Hello,

Please see below details for an FX Spot Trade we did with the desk today for account 6MZ20049. Please let me know if you need more information.

{data} """ ), to_recipients=_recipients["BAML_FCM"], cc_recipients=("nyops@lmcg.com",), ) @dataclass class EmailOps: _em = ExchangeMessage() @classmethod def email_boston(cls, date): cls._em.send_email( f"Missing Cash Balance for Scotia {date}", f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!", to_recipients=_recipients["NYOPS"], )