from collections import defaultdict from dataclasses import field, dataclass import logging from typing import Literal, ClassVar import datetime import csv from serenitas.ops.trade_dataclasses import Deal from serenitas.utils.exchange import ExchangeMessage, FileAttachment from serenitas.utils.remote import SftpClient from exchangelib import HTMLBody from tabulate import tabulate from functools import lru_cache from serenitas.analytics.dates import next_business_day import math import re from zoneinfo import ZoneInfo from .misc import ( _recipients, _cc_recipients, _settlement_recipients, _valuation_recipients, ) logger = logging.getLogger(__name__) def next_business_days(date, offset): for i in range(offset + 1): date = next_business_day(date) return date def get_file_status(s): if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): orig_name, submit_date, status, process_date = m.groups() else: raise ValueError(f"Can't parse status from file {s}") zone = ZoneInfo("America/New_York") submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( tzinfo=zone ) process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( tzinfo=datetime.timezone.utc ) if orig_name == ("innocap_serenitas_trades_"): file_type = "trade" elif orig_name == "i.innocap_serenitas.": file_type = "instrument" else: raise ValueError(f"error with {s}") return file_type, "PROCESSED" in s, submit_date, process_date def instrument_table(instrument_id): if instrument_id.startswith("IRS"): return "citco_irs" elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): return "citco_swaption" elif instrument_id.startswith("CDS_"): return "citco_tranche" elif instrument_id.startswith("TRS"): return "citco_trs" def round_up(n, decimals=0): multiplier = 10**decimals return math.ceil(n * multiplier) / multiplier def notify_payment_settlements(date, fund, conn): end_date = next_business_days(date, 2) with conn.cursor() as c: c.execute( "SELECT * from payment_settlements WHERE settle_date BETWEEN %s AND %s AND fund= %s AND asset_class in ('SPOT', 'SWAPTION', 'TRANCHE') ORDER BY settle_date asc", (date, end_date, fund), ) for row in c: d = row._asdict() d["settlement_amount"] = d["payment_amount"] PaymentMonitor.stage(d) PaymentMonitor.email(fund) PaymentMonitor._staging_queue.clear() def notify_fx_hedge(date, fund, conn): with conn.cursor() as c: c.execute( "SELECT * from fcm_moneyline LEFT JOIN accounts2 ON account=cash_account WHERE date=%s AND currency='EUR' AND fund=%s AND abs(current_excess_deficit) > 1000000", (date, fund), ) for row in c: d = row._asdict() d["amount"] = d["current_excess_deficit"] d["category"] = "FCM" FxHedge.stage(d) FxHedge.email(fund) FxHedge._staging_queue.clear() def check_cleared_cds(date, fund, conn): _tolerance = {"IG": 0.10, "HY": 0.20, "EU": 0.20, "XO": 0.30} with conn.cursor() as c: c.execute( "SELECT * FROM list_cds_marks(%s, NULL, %s), fx WHERE date=%s AND abs((notional*factor) - globeop_nav) < 100;", (date, fund, date), ) for row in c: d = row._asdict() d["serenitas_quote"] = d["price"] match d["index"]: case "XO" | "EU": d["admin_quote"] = 100 - ( ((d["globeop_nav"] - d["accrued"]) / d["eurusd"]) / (d["globeop_notional"] / 100) ) case _: d["admin_quote"] = 100 - ( (d["globeop_nav"] - d["accrued"]) / (d["globeop_notional"] / 100) ) d["difference"] = abs(d["price"] - d["admin_quote"]) if d["difference"] > _tolerance[d["index"]]: CDXQuoteMonitor.stage(d) CDXQuoteMonitor.email(fund) CDXQuoteMonitor._staging_queue.clear() @dataclass class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): id: int = field(init=False, metadata={"insert": False}) identifier_type: Literal["trade", "instrument"] citco_id: str serenitas_id: str submit_date: datetime.datetime process_date: datetime.date _sftp: ClassVar = field(metadata={"insert": False}) @classmethod @lru_cache(1280) def process(cls, fname): file_type, status, submit_date, process_date = get_file_status(fname) if status: if file_type == "trade": key = "Order" elif file_type == "instrument": key = "Security" with cls._sftp.client.open(fname) as fh: for row in csv.DictReader(fh): trade = cls( file_type, row[f"Internal_{key}_Id"], row[f"External_{key}_Id"], submit_date, process_date, ) trade.stage() else: with cls._sftp.client.open(fname) as fh: next(fh) for row in csv.reader(fh): id_or_error = row[2] if len(row) > 2 else row[-1] trade = cls( "failed", row[-1], id_or_error, submit_date, process_date, ) trade.stage() @classmethod def update_citco_tables(cls, newvals): d = defaultdict(list) for row in newvals: if row.identifier_type == "instrument": d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) for table, v in d.items(): sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" with cls._conn.cursor() as c: c.executemany(sql_str, v) cls._conn.commit() @classmethod def commit(cls): if not cls._insert_queue: return with cls._conn.cursor() as c: c.executemany(cls._sql_insert, cls._insert_queue, returning=True) newvals = [] while True: if val := c.fetchone(): newvals.append(val) if not c.nextset(): break cls._conn.commit() if newvals: cls.update_citco_tables(newvals) em = ExchangeMessage() em.send_email( "(CITCO) UPLOAD REPORT", cls._format(newvals), ( "fyu@lmcg.com", "ghorel@lmcg.com", "etsui@lmcg.com", ), ) @classmethod def _format(cls, vals): t = tabulate( vals, headers=[ "upload_type", "citco_id", "serenitas_id", "submit_date", "process_date", ], tablefmt="unsafehtml", ) html = HTMLBody( f"""
{t} """ ) return html @classmethod def init_sftp(cls): cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") @classmethod def check_cache(cls): if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: raise ValueError( "Too many files in the SFTP compared to cache max size" ) CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( "RETURNING *", "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", ) @dataclass class Monitor: date: datetime.date headers: ClassVar = () num_format: ClassVar = [] _staging_queue: ClassVar[list] = [] _em: ClassVar = ExchangeMessage() def __init_subclass__(cls, headers, num_format=[]): cls.headers = headers cls.num_format = num_format @classmethod def stage(cls, d: dict): cls._staging_queue.append(list(d[key] for key in cls.headers)) @classmethod def format(cls): for line in cls._staging_queue: for f, i in cls.num_format: line[i] = f.format(line[i]) @classmethod def to_tabulate(cls): cls.format() t = tabulate( cls._staging_queue, headers=cls.headers, tablefmt="unsafehtml", ) return t class GFSMonitor( Monitor, headers=( "date", "portfolio", "amount", "currency", ), num_format=[("{0:,.2f}", 2)], ): @classmethod def email(cls, fund): if not cls._staging_queue: return cls._em.send_email( f"GFS Helper Strategy Issue: {fund}", HTMLBody( f""" Good morning,