diff options
Diffstat (limited to 'python/report_ops')
| -rw-r--r-- | python/report_ops/__init__.py | 0 | ||||
| -rw-r--r-- | python/report_ops/cash.py | 129 | ||||
| -rw-r--r-- | python/report_ops/misc.py | 22 | ||||
| -rw-r--r-- | python/report_ops/remote.py | 106 | ||||
| -rw-r--r-- | python/report_ops/utils.py | 332 | ||||
| -rw-r--r-- | python/report_ops/wires.py | 110 |
6 files changed, 699 insertions, 0 deletions
diff --git a/python/report_ops/__init__.py b/python/report_ops/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/python/report_ops/__init__.py diff --git a/python/report_ops/cash.py b/python/report_ops/cash.py new file mode 100644 index 00000000..092dbb7d --- /dev/null +++ b/python/report_ops/cash.py @@ -0,0 +1,129 @@ +from dataclasses import field, dataclass +from serenitas.ops.trade_dataclasses import Deal, Fund +from serenitas.analytics.dates import prev_business_day, next_business_day +import datetime +from serenitas.utils.exchange import ExchangeMessage +from serenitas.utils.env import DAILY_DIR +import pandas as pd +from serenitas.utils.db import dbconn, dawn_engine +from typing import ClassVar +from .misc import get_dir, dt_from_fname +from .custodians import NT, UMB + + +@dataclass +class IAMDeal(Deal, deal_type=None, table_name="iam_tickets"): + trade_date: datetime.date = field(metadata={"globeop": "SettlementDate"}) + action: str = field(metadata={"globeop": "Action"}) + strategy: str = field(metadata={"globeop": "Folder"}) + counterparty: str = field(metadata={"globeop": "Counterparty"}) + maturity: datetime.date + start_money: float = field(metadata={"globeop": "StartMoney"}) + currency: str = field(metadata={"globeop": "Currency"}) + booked_offset: bool + uploaded: bool + fund: Fund + dealid: str = field(metadata={"insert": False}) + id: int = field(metadata={"insert": False}) + + def to_globeop(self, action): + obj = super().to_globeop(action) + obj["Deal Type"] = "IamDeal" + obj["ExpirationDate"] = self.trade_date if self.action == "UPDATE" else None + obj["CallNoticeIndicator"] = "24H" if self.action == "NEW" else None + obj["TransactionIndicator"] = ("DEPOSIT" if obj["StartMoney"] > 0 else "LOAN",) + obj["StartMoney"] = abs(obj["StartMoney"]) + obj["Folder"] = ( + "M_CSH_CASH" if obj["strategy"] == "CSH_CASH" else obj["strategy"] + ) + obj["DealFunction"] = "OTC" + obj["MarginType"] = "Variation" + obj["Basis"] = "ACT/360" + return obj + + +@dataclass +class CashReport: + fund: ClassVar[str] + account_number: ClassVar[str] + date: datetime.date + _conn: ClassVar[dbconn] = dbconn("dawndb") + _staging_queue: ClassVar[set] = set() + _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" + + def __init_subclass__(cls, fund, account_number): + cls.fund = fund + cls.account_number = account_number + + def to_db(self, report_name): + self.download_reports(self.date) + report_dir = get_dir(self.date) + report_dir.mkdir(exist_ok=True, parents=True) + p = max( + [f for f in get_dir(self.date).iterdir() if f.name.startswith(report_name)], + key=dt_from_fname, + default=None, + ) + if not p: + raise ValueError( + f"No reports found for fund: {self.fund} date: {self.date}" + ) + return p + + @classmethod + def commit(cls): + with cls._conn.cursor() as c: + c.executemany(cls._insert_sql, cls._staging_queue) + cls._conn.commit() + + +class NTCashReport(CashReport, NT, fund="ISOSEL", account_number="ISOS01"): + def to_db(self): + p = super().to_db("cash_") + df = pd.read_csv(p, on_bad_lines="warn") + df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] + df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] + df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() + df = df.set_index(["consolidation", "currencycode"]) + for row in df.itertuples(): + self.stage_from_row(row) + self.commit() + self._staging_queue.clear() + + def stage_from_row(self, row): + (account, currency), amount = row + self._staging_queue.add( + ( + prev_business_day(self.date), + self.fund, + f"NT Custody Account {self.fund}", + account, + currency, + amount, + ) + ) + + +class UMBCashReport(CashReport, UMB, fund="SERCGMAST", account_number="159260.1"): + def to_db(self): + p = super().to_db("umb_") + df = pd.read_excel(p, skiprows=3) + for row in ( + df.groupby(["Portfolio #", "Currency"]).sum()["Current Balance"].items() + ): + self.stage_from_row(row) + self.commit() + self._staging_queue.clear() + + def stage_from_row(self, row): + (account, currency), amount = row + self._staging_queue.add( + ( + prev_business_day(self.date), + self.fund, + f"UMB Custody Account {self.fund}", + account, + currency, + amount, + ) + ) diff --git a/python/report_ops/misc.py b/python/report_ops/misc.py new file mode 100644 index 00000000..318bb4eb --- /dev/null +++ b/python/report_ops/misc.py @@ -0,0 +1,22 @@ +import pathlib +import datetime +from serenitas.utils.env import DAILY_DIR + + +def get_dir(workdate: datetime.date, archived=True) -> pathlib.Path: + p = DAILY_DIR / str(workdate) / "Reports" + if not p.exists() and archived: + p = ( + DAILY_DIR + / str(workdate.year) + / f"{workdate:%Y_%m}" + / str(workdate) + / "Reports" + ) + return p + + +def dt_from_fname(f): + return datetime.datetime.strptime( + f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1], "%Y%m%d%H%M" + ) diff --git a/python/report_ops/remote.py b/python/report_ops/remote.py new file mode 100644 index 00000000..2dac0db2 --- /dev/null +++ b/python/report_ops/remote.py @@ -0,0 +1,106 @@ +from serenitas.utils.remote import SftpClient +from typing import Callable, List +import pandas as pd +from serenitas.utils.db import dawn_engine, dbconn +import datetime +import re +from serenitas.analytics.dates import prev_business_day, next_business_day + + +def citco_accrued(s): + if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s): + dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S") + return prev_business_day(dt) + + +def citco_all(s): + if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s): + dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") + return dt + + +def load_citco_report(fh, kd, date_cols): + df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True) + df["row"] = df.index + df.columns = df.columns.str.lower() + df.columns = df.columns.str.replace(" ", "_") + df["period_end_date"] = kd.date() + df["knowledge_date"] = next_business_day(kd) + return df + + +class Report: + table: str + ped_func: Callable[[str], datetime.datetime] + _sftp = SftpClient.from_creds("citco", folder="outgoing") + _conn: dbconn = dbconn("dawndb") + date_cols: List[str] = [] + + def __init_subclass__(cls, table, f, fname, date_cols): + cls.table = table + cls.ped_func = f + cls.fname = fname + cls.date_cols = date_cols + + def __init__(self, date): + self.date = date + + @property + def most_recent_report(self): + report_files = [ + filename + for filename in self._sftp.client.listdir() + if self.fname in filename + if type(self).ped_func(filename).date() == self.date + ] + try: + return max(report_files, key=type(self).ped_func) + except ValueError: + raise ValueError(f"Missing data for {self.table}: {self.date}") + + def to_df(self): + with self._sftp.client.open(self.most_recent_report) as fh: + return load_citco_report( + fh, type(self).ped_func(self.most_recent_report), self.date_cols + ) + + def to_db(self): + df = self.to_df() + with self._conn.cursor() as c: + c.execute( + f"DELETE FROM {self.table} WHERE period_end_date= %s", + (self.date,), + ) + self._conn.commit() + if "strategy" in df.columns: + df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True) + df.to_sql(self.table, dawn_engine, if_exists="append", index=False) + + +class AccruedReport( + Report, + table="isosel_accrued", + f=citco_accrued, + fname="100502500_INNOCAP_ISOSEL", + date_cols=[ + "Init Date", + "Init Settle Date", + "Liqd Date", + "Liqd Settle Date", + "Bond Maturity", + "Orig Date", + "Start Date", + "End Date", + ], +): + pass + + +class AllReport( + Report, + table="citco_reports", + f=citco_all, + fname="SPOS4X_INNOCAP_ISOSEL_D_IM", + date_cols=["Maturity Date"], +): + pass diff --git a/python/report_ops/utils.py b/python/report_ops/utils.py new file mode 100644 index 00000000..36839865 --- /dev/null +++ b/python/report_ops/utils.py @@ -0,0 +1,332 @@ +from collections import defaultdict +from dataclasses import field, dataclass +import logging +from typing import Literal, ClassVar +import datetime +import csv +from serenitas.ops.trade_dataclasses import Deal +from serenitas.utils.exchange import ExchangeMessage +from serenitas.utils.remote import SftpClient +from exchangelib import HTMLBody +from tabulate import tabulate +from functools import lru_cache +from serenitas.analytics.dates import next_business_day +from decimal import Decimal +import math +import re +from zoneinfo import ZoneInfo + +logger = logging.getLogger(__name__) + + +def next_business_days(date, offset): + for i in range(offset): + date = next_business_day(date) + return date + + +def get_file_status(s): + if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): + orig_name, submit_date, status, process_date = m.groups() + else: + raise ValueError(f"Can't parse status from file {s}") + + zone = ZoneInfo("America/New_York") + submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( + tzinfo=zone + ) + process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( + tzinfo=datetime.timezone.utc + ) + if orig_name == ("innocap_serenitas_trades_"): + file_type = "trade" + elif orig_name == "i.innocap_serenitas.": + file_type = "instrument" + else: + raise ValueError(f"error with {s}") + return file_type, "PROCESSED" in s, submit_date, process_date + + +def instrument_table(instrument_id): + if instrument_id.startswith("IRS"): + return "citco_irs" + elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): + return "citco_swaption" + elif instrument_id.startswith("CDS_"): + return "citco_tranche" + elif instrument_id.startswith("TRS"): + return "citco_trs" + + +def round_up(n, decimals=0): + multiplier = 10**decimals + return math.ceil(n * multiplier) / multiplier + + +@dataclass +class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): + id: int = field(init=False, metadata={"insert": False}) + identifier_type: Literal["trade", "instrument"] + citco_id: str + serenitas_id: str + submit_date: datetime.datetime + process_date: datetime.date + _sftp: ClassVar = field(metadata={"insert": False}) + + @classmethod + @lru_cache(1280) + def process(cls, fname): + file_type, status, submit_date, process_date = get_file_status(fname) + if status: + if file_type == "trade": + key = "Order" + elif file_type == "instrument": + key = "Security" + with cls._sftp.client.open(fname) as fh: + for row in csv.DictReader(fh): + trade = cls( + file_type, + row[f"Internal_{key}_Id"], + row[f"External_{key}_Id"], + submit_date, + process_date, + ) + trade.stage() + else: + with cls._sftp.client.open(fname) as fh: + next(fh) + for row in csv.reader(fh): + id_or_error = row[2] if len(row) > 2 else row[-1] + trade = cls( + "failed", + row[-1], + id_or_error, + submit_date, + process_date, + ) + trade.stage() + + @classmethod + def update_citco_tables(cls, newvals): + d = defaultdict(list) + for row in newvals: + if row.identifier_type == "instrument": + d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) + for table, v in d.items(): + sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" + with cls._conn.cursor() as c: + c.executemany(sql_str, v) + cls._conn.commit() + + @classmethod + def commit(cls): + if not cls._insert_queue: + return + with cls._conn.cursor() as c: + c.executemany(cls._sql_insert, cls._insert_queue, returning=True) + newvals = [] + while True: + if val := c.fetchone(): + newvals.append(val) + if not c.nextset(): + break + cls._conn.commit() + if newvals: + cls.update_citco_tables(newvals) + em = ExchangeMessage() + em.send_email( + "(CITCO) UPLOAD REPORT", + cls._format(newvals), + ( + "fyu@lmcg.com", + "ghorel@lmcg.com", + "etsui@lmcg.com", + ), + ) + + @classmethod + def _format(cls, vals): + t = tabulate( + vals, + headers=[ + "upload_type", + "citco_id", + "serenitas_id", + "submit_date", + "process_date", + ], + tablefmt="unsafehtml", + ) + html = HTMLBody( + f""" + <html> + <head> + <style> + table, th, td {{ border: 1px solid black; border-collapse: collapse;}} + th, td {{ padding: 5px; }} + </style> + </head> + <body> + {t} + </body> + </html> + """ + ) + return html + + @classmethod + def init_sftp(cls): + cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") + + @classmethod + def check_cache(cls): + if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: + if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: + raise ValueError( + "Too many files in the SFTP compared to cache max size" + ) + + +CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( + "RETURNING *", + "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", +) + + +_recipients = { + "ISOSEL": ( + "luke.treacy@innocap.com", + "margincalls@innocapglobal.com", + ), + "BOWDST": ( + "shkumar@sscinc.com", + "mbisoye@sscinc.com", + "hedgemark.lmcg.ops@sscinc.com", + "hm-operations@bnymellon.com", + ), + "SERCGMAST": ( + "SERENITAS.FA@sscinc.com", + "SERENITAS.ops@sscinc.com", + ), + "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",), + "NYOPS": ("nyops@lmcg.com",), +} + + +@dataclass +class Payment: + settle_date: datetime.date + currency: str + amount: float + _insert_queue: ClassVar[list] = [] + + @classmethod + def stage_payment(cls, settlements): + for row in settlements: + cls._insert_queue.append( + cls(row.settle_date, row.currency, row.payment_amount) + ) + + def to_email_format(self): + return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}" + + +class PaymentSettlement(Payment): + @classmethod + def email_innocap(cls, date, account_balance): + if not cls._insert_queue: + return + cls.subtract_cash_balance(account_balance) + move_cash = "" + for currency in ("USD", "EUR"): + biggest_deficit = min( + list( + map( + lambda x: int(x.amount) if x.currency == currency else 0, + cls._insert_queue, + ) + ) + ) + if biggest_deficit < 0: + move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***" + em = ExchangeMessage() + em.send_email( + f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}", + "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n" + + "\n".join( + settlement.to_email_format() for settlement in cls._insert_queue + ) + + move_cash, + to_recipients=_recipients["ISOSEL"], + cc_recipients=("Selene-Ops@lmcg.com",), + ) + cls._insert_queue.clear() + + @classmethod + def stage_payment(cls, settlements, date): + for row in settlements: + cls._insert_queue.append(cls(date, row.currency, row.payment_amount)) + + @classmethod + def subtract_cash_balance(cls, account_balance): + for settlement in cls._insert_queue: + settlement.amount = Decimal(account_balance[settlement.currency]) - ( + -settlement.amount + ) + + +class GFSMonitor(Payment): + @classmethod + def email_globeop(cls, fund): + if not cls._insert_queue: + return + em = ExchangeMessage() + em.send_email( + f"GFS Helper Strategy Issue: {fund}", + "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n" + + "\n".join( + settlement.to_email_format() for settlement in cls._insert_queue + ), + to_recipients=_recipients[fund], + cc_recipients=( + "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com", + ), + ) + + +class BamlFcmNotify: + @classmethod + def email_fcm(cls, date, cash_account, data): + em = ExchangeMessage() + em.send_email( + f"FX Details: {cash_account} {date}", + HTMLBody( + f""" +<html> + <head> + <style> + table, th, td {{ border: 1px solid black; border-collapse: collapse;}} + th, td {{ padding: 5px; }} + </style> + </head> + <body> + Hello,<br><br>Please see below details for an FX Spot Trade we did with the desk today for account {cash_account} Please let me know if you need more information.<br><br>{data} + </body> +</html>""" + ), + to_recipients=_recipients["BAML_FCM"], + cc_recipients=("nyops@lmcg.com",), + ) + + +@dataclass +class EmailOps: + _em = ExchangeMessage() + + @classmethod + def email_boston(cls, date): + cls._em.send_email( + f"Missing Cash Balance for Scotia {date}", + f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!", + to_recipients=_recipients["NYOPS"], + ) diff --git a/python/report_ops/wires.py b/python/report_ops/wires.py new file mode 100644 index 00000000..9f1965fd --- /dev/null +++ b/python/report_ops/wires.py @@ -0,0 +1,110 @@ +from dataclasses import dataclass +import datetime +from serenitas.ops.trade_dataclasses import Deal, Ccy +from typing import ClassVar +from .custodians import NT, BNY +from .misc import get_dir +from dataclasses import field +from csv import DictReader + +_nt_to_currency = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} + + +@dataclass +class Wire(Deal, table_name="custodian_wires", deal_type="custodian_wires"): + date: datetime.date + fund: ClassVar[str] + entry_date: datetime.date + value_date: datetime.date + pay_date: datetime.date + currency: Ccy + amount: float + wire_details: str + unique_ref: str + dtkey: ClassVar = field(metadata={"insert": False, "select": False}) + + def __init_subclass__(cls, fund, dtkey, **kwargs): + cls._sql_insert = ( + cls._sql_insert.removesuffix("RETURNING *") + + "ON CONFLICT (unique_ref) DO NOTHING RETURNING *" + ) + cls.fund = fund + cls.dtkey = dtkey + + def __post_init__(self): + self.amount = self.amount.replace(",", "") + if "(" in self.amount: + self.amount = -float(self.amount[1:-1]) + else: + self.amount = float(self.amount) + + @classmethod + def to_db(cls, fname, date): + cls.download_reports(date) + p = max( + [f for f in get_dir(date).iterdir() if fname in f.name], + key=cls.dtkey_fun(), + default=None, + ) + return p + + @classmethod + def dtkey_fun(cls): + def dtkey_fun(f): + return datetime.datetime.strptime( + f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1], + cls.dtkey, + ) + + return dtkey_fun + + +class BowdstWire(Wire, BNY, fund="BOWDST", dtkey="%Y%m%d%H%M%S"): + @classmethod + def from_report_line(cls, line: dict): + return cls( + date=line["Report Run Date"], + entry_date=line["Cash Entry Date"], + value_date=line["Cash Value Date"], + pay_date=line["Settle / Pay Date"], + currency=line["Local Currency Code"], + amount=line["Local Amount"], + wire_details=line["Transaction Description 1"] + if line["Transaction Type Code"] == "CW" + else line["Transaction Description 2"], + unique_ref=line["Reference Number"], + ) + + @classmethod + def to_db(cls, date): + p = super().to_db("BowdstWires", date) + with open(p) as fh: + reader = DictReader(fh) + for line in reader: + cls.from_report_line(line).stage() + cls.commit() + + +class NTWire(Wire, NT, fund="ISOSEL", dtkey="%Y%m%d%H%M"): + @classmethod + def from_passport_line(cls, line: dict): + return cls( + date=line["Through date"], + entry_date=line["D-GL-POST"], + value_date=line["D-TRAN-EFF"], + pay_date=line["D-TRAN-EFF"], + currency=_nt_to_currency[line["N-GL-AC30"]], + amount=line["Net amount - local"], + wire_details=line["narrative"], + unique_ref=line["C-EXTL-SYS-TRN-DSC-3"], + ) + + @classmethod + def to_db(cls, date): + p = super().to_db("custodian_wires", date) + with open(p) as fh: + reader = DictReader(fh) + for line in reader: + if "sponsor" in line["narrative"].lower(): + cls.from_preport_line(line).stage() + cls.commit() |
