aboutsummaryrefslogtreecommitdiffstats
path: root/python/report_ops/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/report_ops/utils.py')
-rw-r--r--python/report_ops/utils.py332
1 files changed, 332 insertions, 0 deletions
diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py
new file mode 100644
index 00000000..36839865
--- /dev/null
+++ b/python/report_ops/utils.py
@@ -0,0 +1,332 @@
+from collections import defaultdict
+from dataclasses import field, dataclass
+import logging
+from typing import Literal, ClassVar
+import datetime
+import csv
+from serenitas.ops.trade_dataclasses import Deal
+from serenitas.utils.exchange import ExchangeMessage
+from serenitas.utils.remote import SftpClient
+from exchangelib import HTMLBody
+from tabulate import tabulate
+from functools import lru_cache
+from serenitas.analytics.dates import next_business_day
+from decimal import Decimal
+import math
+import re
+from zoneinfo import ZoneInfo
+
+logger = logging.getLogger(__name__)
+
+
+def next_business_days(date, offset):
+ for i in range(offset):
+ date = next_business_day(date)
+ return date
+
+
+def get_file_status(s):
+ if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s):
+ orig_name, submit_date, status, process_date = m.groups()
+ else:
+ raise ValueError(f"Can't parse status from file {s}")
+
+ zone = ZoneInfo("America/New_York")
+ submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=zone
+ )
+ process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace(
+ tzinfo=datetime.timezone.utc
+ )
+ if orig_name == ("innocap_serenitas_trades_"):
+ file_type = "trade"
+ elif orig_name == "i.innocap_serenitas.":
+ file_type = "instrument"
+ else:
+ raise ValueError(f"error with {s}")
+ return file_type, "PROCESSED" in s, submit_date, process_date
+
+
+def instrument_table(instrument_id):
+ if instrument_id.startswith("IRS"):
+ return "citco_irs"
+ elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
+ return "citco_swaption"
+ elif instrument_id.startswith("CDS_"):
+ return "citco_tranche"
+ elif instrument_id.startswith("TRS"):
+ return "citco_trs"
+
+
+def round_up(n, decimals=0):
+ multiplier = 10**decimals
+ return math.ceil(n * multiplier) / multiplier
+
+
+@dataclass
+class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"):
+ id: int = field(init=False, metadata={"insert": False})
+ identifier_type: Literal["trade", "instrument"]
+ citco_id: str
+ serenitas_id: str
+ submit_date: datetime.datetime
+ process_date: datetime.date
+ _sftp: ClassVar = field(metadata={"insert": False})
+
+ @classmethod
+ @lru_cache(1280)
+ def process(cls, fname):
+ file_type, status, submit_date, process_date = get_file_status(fname)
+ if status:
+ if file_type == "trade":
+ key = "Order"
+ elif file_type == "instrument":
+ key = "Security"
+ with cls._sftp.client.open(fname) as fh:
+ for row in csv.DictReader(fh):
+ trade = cls(
+ file_type,
+ row[f"Internal_{key}_Id"],
+ row[f"External_{key}_Id"],
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+ else:
+ with cls._sftp.client.open(fname) as fh:
+ next(fh)
+ for row in csv.reader(fh):
+ id_or_error = row[2] if len(row) > 2 else row[-1]
+ trade = cls(
+ "failed",
+ row[-1],
+ id_or_error,
+ submit_date,
+ process_date,
+ )
+ trade.stage()
+
+ @classmethod
+ def update_citco_tables(cls, newvals):
+ d = defaultdict(list)
+ for row in newvals:
+ if row.identifier_type == "instrument":
+ d[instrument_table(row.serenitas_id)].append((row.serenitas_id,))
+ for table, v in d.items():
+ sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s"
+ with cls._conn.cursor() as c:
+ c.executemany(sql_str, v)
+ cls._conn.commit()
+
+ @classmethod
+ def commit(cls):
+ if not cls._insert_queue:
+ return
+ with cls._conn.cursor() as c:
+ c.executemany(cls._sql_insert, cls._insert_queue, returning=True)
+ newvals = []
+ while True:
+ if val := c.fetchone():
+ newvals.append(val)
+ if not c.nextset():
+ break
+ cls._conn.commit()
+ if newvals:
+ cls.update_citco_tables(newvals)
+ em = ExchangeMessage()
+ em.send_email(
+ "(CITCO) UPLOAD REPORT",
+ cls._format(newvals),
+ (
+ "fyu@lmcg.com",
+ "ghorel@lmcg.com",
+ "etsui@lmcg.com",
+ ),
+ )
+
+ @classmethod
+ def _format(cls, vals):
+ t = tabulate(
+ vals,
+ headers=[
+ "upload_type",
+ "citco_id",
+ "serenitas_id",
+ "submit_date",
+ "process_date",
+ ],
+ tablefmt="unsafehtml",
+ )
+ html = HTMLBody(
+ f"""
+ <html>
+ <head>
+ <style>
+ table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
+ th, td {{ padding: 5px; }}
+ </style>
+ </head>
+ <body>
+ {t}
+ </body>
+ </html>
+ """
+ )
+ return html
+
+ @classmethod
+ def init_sftp(cls):
+ cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
+
+ @classmethod
+ def check_cache(cls):
+ if cls.process.cache_info().currsize == cls.process.cache_info().maxsize:
+ if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5:
+ raise ValueError(
+ "Too many files in the SFTP compared to cache max size"
+ )
+
+
+CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace(
+ "RETURNING *",
+ "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *",
+)
+
+
+_recipients = {
+ "ISOSEL": (
+ "luke.treacy@innocap.com",
+ "margincalls@innocapglobal.com",
+ ),
+ "BOWDST": (
+ "shkumar@sscinc.com",
+ "mbisoye@sscinc.com",
+ "hedgemark.lmcg.ops@sscinc.com",
+ "hm-operations@bnymellon.com",
+ ),
+ "SERCGMAST": (
+ "SERENITAS.FA@sscinc.com",
+ "SERENITAS.ops@sscinc.com",
+ ),
+ "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",),
+ "NYOPS": ("nyops@lmcg.com",),
+}
+
+
+@dataclass
+class Payment:
+ settle_date: datetime.date
+ currency: str
+ amount: float
+ _insert_queue: ClassVar[list] = []
+
+ @classmethod
+ def stage_payment(cls, settlements):
+ for row in settlements:
+ cls._insert_queue.append(
+ cls(row.settle_date, row.currency, row.payment_amount)
+ )
+
+ def to_email_format(self):
+ return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}"
+
+
+class PaymentSettlement(Payment):
+ @classmethod
+ def email_innocap(cls, date, account_balance):
+ if not cls._insert_queue:
+ return
+ cls.subtract_cash_balance(account_balance)
+ move_cash = ""
+ for currency in ("USD", "EUR"):
+ biggest_deficit = min(
+ list(
+ map(
+ lambda x: int(x.amount) if x.currency == currency else 0,
+ cls._insert_queue,
+ )
+ )
+ )
+ if biggest_deficit < 0:
+ move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***"
+ em = ExchangeMessage()
+ em.send_email(
+ f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}",
+ "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n"
+ + "\n".join(
+ settlement.to_email_format() for settlement in cls._insert_queue
+ )
+ + move_cash,
+ to_recipients=_recipients["ISOSEL"],
+ cc_recipients=("Selene-Ops@lmcg.com",),
+ )
+ cls._insert_queue.clear()
+
+ @classmethod
+ def stage_payment(cls, settlements, date):
+ for row in settlements:
+ cls._insert_queue.append(cls(date, row.currency, row.payment_amount))
+
+ @classmethod
+ def subtract_cash_balance(cls, account_balance):
+ for settlement in cls._insert_queue:
+ settlement.amount = Decimal(account_balance[settlement.currency]) - (
+ -settlement.amount
+ )
+
+
+class GFSMonitor(Payment):
+ @classmethod
+ def email_globeop(cls, fund):
+ if not cls._insert_queue:
+ return
+ em = ExchangeMessage()
+ em.send_email(
+ f"GFS Helper Strategy Issue: {fund}",
+ "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n"
+ + "\n".join(
+ settlement.to_email_format() for settlement in cls._insert_queue
+ ),
+ to_recipients=_recipients[fund],
+ cc_recipients=(
+ "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com",
+ ),
+ )
+
+
+class BamlFcmNotify:
+ @classmethod
+ def email_fcm(cls, date, cash_account, data):
+ em = ExchangeMessage()
+ em.send_email(
+ f"FX Details: {cash_account} {date}",
+ HTMLBody(
+ f"""
+<html>
+ <head>
+ <style>
+ table, th, td {{ border: 1px solid black; border-collapse: collapse;}}
+ th, td {{ padding: 5px; }}
+ </style>
+ </head>
+ <body>
+ Hello,<br><br>Please see below details for an FX Spot Trade we did with the desk today for account {cash_account} Please let me know if you need more information.<br><br>{data}
+ </body>
+</html>"""
+ ),
+ to_recipients=_recipients["BAML_FCM"],
+ cc_recipients=("nyops@lmcg.com",),
+ )
+
+
+@dataclass
+class EmailOps:
+ _em = ExchangeMessage()
+
+ @classmethod
+ def email_boston(cls, date):
+ cls._em.send_email(
+ f"Missing Cash Balance for Scotia {date}",
+ f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!",
+ to_recipients=_recipients["NYOPS"],
+ )