diff options
Diffstat (limited to 'python/citco_ops')
| -rw-r--r-- | python/citco_ops/__init__.py | 0 | ||||
| -rw-r--r-- | python/citco_ops/cash.py | 129 | ||||
| -rw-r--r-- | python/citco_ops/misc.py | 22 | ||||
| -rw-r--r-- | python/citco_ops/remote.py | 106 | ||||
| -rw-r--r-- | python/citco_ops/utils.py | 332 | ||||
| -rw-r--r-- | python/citco_ops/wires.py | 110 |
6 files changed, 0 insertions, 699 deletions
diff --git a/python/citco_ops/__init__.py b/python/citco_ops/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/python/citco_ops/__init__.py +++ /dev/null diff --git a/python/citco_ops/cash.py b/python/citco_ops/cash.py deleted file mode 100644 index 092dbb7d..00000000 --- a/python/citco_ops/cash.py +++ /dev/null @@ -1,129 +0,0 @@ -from dataclasses import field, dataclass -from serenitas.ops.trade_dataclasses import Deal, Fund -from serenitas.analytics.dates import prev_business_day, next_business_day -import datetime -from serenitas.utils.exchange import ExchangeMessage -from serenitas.utils.env import DAILY_DIR -import pandas as pd -from serenitas.utils.db import dbconn, dawn_engine -from typing import ClassVar -from .misc import get_dir, dt_from_fname -from .custodians import NT, UMB - - -@dataclass -class IAMDeal(Deal, deal_type=None, table_name="iam_tickets"): - trade_date: datetime.date = field(metadata={"globeop": "SettlementDate"}) - action: str = field(metadata={"globeop": "Action"}) - strategy: str = field(metadata={"globeop": "Folder"}) - counterparty: str = field(metadata={"globeop": "Counterparty"}) - maturity: datetime.date - start_money: float = field(metadata={"globeop": "StartMoney"}) - currency: str = field(metadata={"globeop": "Currency"}) - booked_offset: bool - uploaded: bool - fund: Fund - dealid: str = field(metadata={"insert": False}) - id: int = field(metadata={"insert": False}) - - def to_globeop(self, action): - obj = super().to_globeop(action) - obj["Deal Type"] = "IamDeal" - obj["ExpirationDate"] = self.trade_date if self.action == "UPDATE" else None - obj["CallNoticeIndicator"] = "24H" if self.action == "NEW" else None - obj["TransactionIndicator"] = ("DEPOSIT" if obj["StartMoney"] > 0 else "LOAN",) - obj["StartMoney"] = abs(obj["StartMoney"]) - obj["Folder"] = ( - "M_CSH_CASH" if obj["strategy"] == "CSH_CASH" else obj["strategy"] - ) - obj["DealFunction"] = "OTC" - obj["MarginType"] = "Variation" - obj["Basis"] = "ACT/360" - return obj - - -@dataclass -class CashReport: - fund: ClassVar[str] - account_number: ClassVar[str] - date: datetime.date - _conn: ClassVar[dbconn] = dbconn("dawndb") - _staging_queue: ClassVar[set] = set() - _insert_sql = "INSERT INTO cash_balances VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING" - - def __init_subclass__(cls, fund, account_number): - cls.fund = fund - cls.account_number = account_number - - def to_db(self, report_name): - self.download_reports(self.date) - report_dir = get_dir(self.date) - report_dir.mkdir(exist_ok=True, parents=True) - p = max( - [f for f in get_dir(self.date).iterdir() if f.name.startswith(report_name)], - key=dt_from_fname, - default=None, - ) - if not p: - raise ValueError( - f"No reports found for fund: {self.fund} date: {self.date}" - ) - return p - - @classmethod - def commit(cls): - with cls._conn.cursor() as c: - c.executemany(cls._insert_sql, cls._staging_queue) - cls._conn.commit() - - -class NTCashReport(CashReport, NT, fund="ISOSEL", account_number="ISOS01"): - def to_db(self): - p = super().to_db("cash_") - df = pd.read_csv(p, on_bad_lines="warn") - df = df[df["T-NARR-LONG"] == "CLOSING BALANCE"] - df = df[["Consolidation", "Currency code", "A-TRAN-AMT"]] - df.columns = df.columns.str.replace(" |-|_", "", regex=True).str.lower() - df = df.set_index(["consolidation", "currencycode"]) - for row in df.itertuples(): - self.stage_from_row(row) - self.commit() - self._staging_queue.clear() - - def stage_from_row(self, row): - (account, currency), amount = row - self._staging_queue.add( - ( - prev_business_day(self.date), - self.fund, - f"NT Custody Account {self.fund}", - account, - currency, - amount, - ) - ) - - -class UMBCashReport(CashReport, UMB, fund="SERCGMAST", account_number="159260.1"): - def to_db(self): - p = super().to_db("umb_") - df = pd.read_excel(p, skiprows=3) - for row in ( - df.groupby(["Portfolio #", "Currency"]).sum()["Current Balance"].items() - ): - self.stage_from_row(row) - self.commit() - self._staging_queue.clear() - - def stage_from_row(self, row): - (account, currency), amount = row - self._staging_queue.add( - ( - prev_business_day(self.date), - self.fund, - f"UMB Custody Account {self.fund}", - account, - currency, - amount, - ) - ) diff --git a/python/citco_ops/misc.py b/python/citco_ops/misc.py deleted file mode 100644 index 318bb4eb..00000000 --- a/python/citco_ops/misc.py +++ /dev/null @@ -1,22 +0,0 @@ -import pathlib -import datetime -from serenitas.utils.env import DAILY_DIR - - -def get_dir(workdate: datetime.date, archived=True) -> pathlib.Path: - p = DAILY_DIR / str(workdate) / "Reports" - if not p.exists() and archived: - p = ( - DAILY_DIR - / str(workdate.year) - / f"{workdate:%Y_%m}" - / str(workdate) - / "Reports" - ) - return p - - -def dt_from_fname(f): - return datetime.datetime.strptime( - f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1], "%Y%m%d%H%M" - ) diff --git a/python/citco_ops/remote.py b/python/citco_ops/remote.py deleted file mode 100644 index 2dac0db2..00000000 --- a/python/citco_ops/remote.py +++ /dev/null @@ -1,106 +0,0 @@ -from serenitas.utils.remote import SftpClient -from typing import Callable, List -import pandas as pd -from serenitas.utils.db import dawn_engine, dbconn -import datetime -import re -from serenitas.analytics.dates import prev_business_day, next_business_day - - -def citco_accrued(s): - if m := re.search("100502500_INNOCAP_ISOSEL.([\d]+)\.", s): - dt = datetime.datetime.strptime(m.group(1), "%Y%m%d%H%M%S") - return prev_business_day(dt) - - -def citco_all(s): - if m := re.search("SPOS4X_INNOCAP_ISOSEL_D_IM.([\d.]+)\.", s): - dt = datetime.datetime.strptime(m.group(1), "%Y%m%d.%H%M%S") - return dt - - -def load_citco_report(fh, kd, date_cols): - df = pd.read_csv(fh, parse_dates=date_cols, infer_datetime_format=True) - df["row"] = df.index - df.columns = df.columns.str.lower() - df.columns = df.columns.str.replace(" ", "_") - df["period_end_date"] = kd.date() - df["knowledge_date"] = next_business_day(kd) - return df - - -class Report: - table: str - ped_func: Callable[[str], datetime.datetime] - _sftp = SftpClient.from_creds("citco", folder="outgoing") - _conn: dbconn = dbconn("dawndb") - date_cols: List[str] = [] - - def __init_subclass__(cls, table, f, fname, date_cols): - cls.table = table - cls.ped_func = f - cls.fname = fname - cls.date_cols = date_cols - - def __init__(self, date): - self.date = date - - @property - def most_recent_report(self): - report_files = [ - filename - for filename in self._sftp.client.listdir() - if self.fname in filename - if type(self).ped_func(filename).date() == self.date - ] - try: - return max(report_files, key=type(self).ped_func) - except ValueError: - raise ValueError(f"Missing data for {self.table}: {self.date}") - - def to_df(self): - with self._sftp.client.open(self.most_recent_report) as fh: - return load_citco_report( - fh, type(self).ped_func(self.most_recent_report), self.date_cols - ) - - def to_db(self): - df = self.to_df() - with self._conn.cursor() as c: - c.execute( - f"DELETE FROM {self.table} WHERE period_end_date= %s", - (self.date,), - ) - self._conn.commit() - if "strategy" in df.columns: - df["strategy"] = df["strategy"].str.replace("/M_|/SER_", "/", regex=True) - df.to_sql(self.table, dawn_engine, if_exists="append", index=False) - - -class AccruedReport( - Report, - table="isosel_accrued", - f=citco_accrued, - fname="100502500_INNOCAP_ISOSEL", - date_cols=[ - "Init Date", - "Init Settle Date", - "Liqd Date", - "Liqd Settle Date", - "Bond Maturity", - "Orig Date", - "Start Date", - "End Date", - ], -): - pass - - -class AllReport( - Report, - table="citco_reports", - f=citco_all, - fname="SPOS4X_INNOCAP_ISOSEL_D_IM", - date_cols=["Maturity Date"], -): - pass diff --git a/python/citco_ops/utils.py b/python/citco_ops/utils.py deleted file mode 100644 index 36839865..00000000 --- a/python/citco_ops/utils.py +++ /dev/null @@ -1,332 +0,0 @@ -from collections import defaultdict -from dataclasses import field, dataclass -import logging -from typing import Literal, ClassVar -import datetime -import csv -from serenitas.ops.trade_dataclasses import Deal -from serenitas.utils.exchange import ExchangeMessage -from serenitas.utils.remote import SftpClient -from exchangelib import HTMLBody -from tabulate import tabulate -from functools import lru_cache -from serenitas.analytics.dates import next_business_day -from decimal import Decimal -import math -import re -from zoneinfo import ZoneInfo - -logger = logging.getLogger(__name__) - - -def next_business_days(date, offset): - for i in range(offset): - date = next_business_day(date) - return date - - -def get_file_status(s): - if m := re.match(r"([^\d]*)(\d*)-(PROCESSED|FAILED)_([^-]*)", s): - orig_name, submit_date, status, process_date = m.groups() - else: - raise ValueError(f"Can't parse status from file {s}") - - zone = ZoneInfo("America/New_York") - submit_date = datetime.datetime.strptime(submit_date, "%Y%m%d%H%M%S").replace( - tzinfo=zone - ) - process_date = datetime.datetime.strptime(process_date, "%Y%m%d%H%M%S").replace( - tzinfo=datetime.timezone.utc - ) - if orig_name == ("innocap_serenitas_trades_"): - file_type = "trade" - elif orig_name == "i.innocap_serenitas.": - file_type = "instrument" - else: - raise ValueError(f"error with {s}") - return file_type, "PROCESSED" in s, submit_date, process_date - - -def instrument_table(instrument_id): - if instrument_id.startswith("IRS"): - return "citco_irs" - elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"): - return "citco_swaption" - elif instrument_id.startswith("CDS_"): - return "citco_tranche" - elif instrument_id.startswith("TRS"): - return "citco_trs" - - -def round_up(n, decimals=0): - multiplier = 10**decimals - return math.ceil(n * multiplier) / multiplier - - -@dataclass -class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission2"): - id: int = field(init=False, metadata={"insert": False}) - identifier_type: Literal["trade", "instrument"] - citco_id: str - serenitas_id: str - submit_date: datetime.datetime - process_date: datetime.date - _sftp: ClassVar = field(metadata={"insert": False}) - - @classmethod - @lru_cache(1280) - def process(cls, fname): - file_type, status, submit_date, process_date = get_file_status(fname) - if status: - if file_type == "trade": - key = "Order" - elif file_type == "instrument": - key = "Security" - with cls._sftp.client.open(fname) as fh: - for row in csv.DictReader(fh): - trade = cls( - file_type, - row[f"Internal_{key}_Id"], - row[f"External_{key}_Id"], - submit_date, - process_date, - ) - trade.stage() - else: - with cls._sftp.client.open(fname) as fh: - next(fh) - for row in csv.reader(fh): - id_or_error = row[2] if len(row) > 2 else row[-1] - trade = cls( - "failed", - row[-1], - id_or_error, - submit_date, - process_date, - ) - trade.stage() - - @classmethod - def update_citco_tables(cls, newvals): - d = defaultdict(list) - for row in newvals: - if row.identifier_type == "instrument": - d[instrument_table(row.serenitas_id)].append((row.serenitas_id,)) - for table, v in d.items(): - sql_str = f"UPDATE {table} SET committed=True, status='Acknowledged' WHERE dealid=%s" - with cls._conn.cursor() as c: - c.executemany(sql_str, v) - cls._conn.commit() - - @classmethod - def commit(cls): - if not cls._insert_queue: - return - with cls._conn.cursor() as c: - c.executemany(cls._sql_insert, cls._insert_queue, returning=True) - newvals = [] - while True: - if val := c.fetchone(): - newvals.append(val) - if not c.nextset(): - break - cls._conn.commit() - if newvals: - cls.update_citco_tables(newvals) - em = ExchangeMessage() - em.send_email( - "(CITCO) UPLOAD REPORT", - cls._format(newvals), - ( - "fyu@lmcg.com", - "ghorel@lmcg.com", - "etsui@lmcg.com", - ), - ) - - @classmethod - def _format(cls, vals): - t = tabulate( - vals, - headers=[ - "upload_type", - "citco_id", - "serenitas_id", - "submit_date", - "process_date", - ], - tablefmt="unsafehtml", - ) - html = HTMLBody( - f""" - <html> - <head> - <style> - table, th, td {{ border: 1px solid black; border-collapse: collapse;}} - th, td {{ padding: 5px; }} - </style> - </head> - <body> - {t} - </body> - </html> - """ - ) - return html - - @classmethod - def init_sftp(cls): - cls._sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications") - - @classmethod - def check_cache(cls): - if cls.process.cache_info().currsize == cls.process.cache_info().maxsize: - if (cls.process.cache_info().misses / cls.process.cache_info().hits) > 0.5: - raise ValueError( - "Too many files in the SFTP compared to cache max size" - ) - - -CitcoSubmission._sql_insert = CitcoSubmission._sql_insert.replace( - "RETURNING *", - "ON CONFLICT (identifier_type, submit_date, process_date, citco_id) DO NOTHING RETURNING *", -) - - -_recipients = { - "ISOSEL": ( - "luke.treacy@innocap.com", - "margincalls@innocapglobal.com", - ), - "BOWDST": ( - "shkumar@sscinc.com", - "mbisoye@sscinc.com", - "hedgemark.lmcg.ops@sscinc.com", - "hm-operations@bnymellon.com", - ), - "SERCGMAST": ( - "SERENITAS.FA@sscinc.com", - "SERENITAS.ops@sscinc.com", - ), - "BAML_FCM": ("footc_margin_csr_amrs@bofa.com",), - "NYOPS": ("nyops@lmcg.com",), -} - - -@dataclass -class Payment: - settle_date: datetime.date - currency: str - amount: float - _insert_queue: ClassVar[list] = [] - - @classmethod - def stage_payment(cls, settlements): - for row in settlements: - cls._insert_queue.append( - cls(row.settle_date, row.currency, row.payment_amount) - ) - - def to_email_format(self): - return f"\t* {self.settle_date}: {self.amount:,.2f} {self.currency}" - - -class PaymentSettlement(Payment): - @classmethod - def email_innocap(cls, date, account_balance): - if not cls._insert_queue: - return - cls.subtract_cash_balance(account_balance) - move_cash = "" - for currency in ("USD", "EUR"): - biggest_deficit = min( - list( - map( - lambda x: int(x.amount) if x.currency == currency else 0, - cls._insert_queue, - ) - ) - ) - if biggest_deficit < 0: - move_cash += f"\n\n***Please move ${round_up(abs(biggest_deficit), -6):,.2f} {currency} to Northern Trust from Scotia and confirm when done.***" - em = ExchangeMessage() - em.send_email( - f"{'*ACTION REQUESTED* ' if move_cash else ''}Payment Settlements Bond/FX NT: ISOSEL {date}", - "Good morning, \n\nProjected Balances at Northern Trust: (Positive Amounts = Positive Balance, Negative Amounts = Negative Balance)\n\n" - + "\n".join( - settlement.to_email_format() for settlement in cls._insert_queue - ) - + move_cash, - to_recipients=_recipients["ISOSEL"], - cc_recipients=("Selene-Ops@lmcg.com",), - ) - cls._insert_queue.clear() - - @classmethod - def stage_payment(cls, settlements, date): - for row in settlements: - cls._insert_queue.append(cls(date, row.currency, row.payment_amount)) - - @classmethod - def subtract_cash_balance(cls, account_balance): - for settlement in cls._insert_queue: - settlement.amount = Decimal(account_balance[settlement.currency]) - ( - -settlement.amount - ) - - -class GFSMonitor(Payment): - @classmethod - def email_globeop(cls, fund): - if not cls._insert_queue: - return - em = ExchangeMessage() - em.send_email( - f"GFS Helper Strategy Issue: {fund}", - "Good morning, \n\nWe noticed some cash in the GFS helper strategy that shouldn't be there:\n\n" - + "\n".join( - settlement.to_email_format() for settlement in cls._insert_queue - ), - to_recipients=_recipients[fund], - cc_recipients=( - "Bowdoin-Ops@LMCG.com" if fund == "BOWDST" else "NYOps@lmcg.com", - ), - ) - - -class BamlFcmNotify: - @classmethod - def email_fcm(cls, date, cash_account, data): - em = ExchangeMessage() - em.send_email( - f"FX Details: {cash_account} {date}", - HTMLBody( - f""" -<html> - <head> - <style> - table, th, td {{ border: 1px solid black; border-collapse: collapse;}} - th, td {{ padding: 5px; }} - </style> - </head> - <body> - Hello,<br><br>Please see below details for an FX Spot Trade we did with the desk today for account {cash_account} Please let me know if you need more information.<br><br>{data} - </body> -</html>""" - ), - to_recipients=_recipients["BAML_FCM"], - cc_recipients=("nyops@lmcg.com",), - ) - - -@dataclass -class EmailOps: - _em = ExchangeMessage() - - @classmethod - def email_boston(cls, date): - cls._em.send_email( - f"Missing Cash Balance for Scotia {date}", - f"Please provide cash balance for Scotia for {date} in Blotter.\n\nThanks!", - to_recipients=_recipients["NYOPS"], - ) diff --git a/python/citco_ops/wires.py b/python/citco_ops/wires.py deleted file mode 100644 index 9f1965fd..00000000 --- a/python/citco_ops/wires.py +++ /dev/null @@ -1,110 +0,0 @@ -from dataclasses import dataclass -import datetime -from serenitas.ops.trade_dataclasses import Deal, Ccy -from typing import ClassVar -from .custodians import NT, BNY -from .misc import get_dir -from dataclasses import field -from csv import DictReader - -_nt_to_currency = {"EURO - EUR": "EUR", "U.S. DOLLARS - USD": "USD"} - - -@dataclass -class Wire(Deal, table_name="custodian_wires", deal_type="custodian_wires"): - date: datetime.date - fund: ClassVar[str] - entry_date: datetime.date - value_date: datetime.date - pay_date: datetime.date - currency: Ccy - amount: float - wire_details: str - unique_ref: str - dtkey: ClassVar = field(metadata={"insert": False, "select": False}) - - def __init_subclass__(cls, fund, dtkey, **kwargs): - cls._sql_insert = ( - cls._sql_insert.removesuffix("RETURNING *") - + "ON CONFLICT (unique_ref) DO NOTHING RETURNING *" - ) - cls.fund = fund - cls.dtkey = dtkey - - def __post_init__(self): - self.amount = self.amount.replace(",", "") - if "(" in self.amount: - self.amount = -float(self.amount[1:-1]) - else: - self.amount = float(self.amount) - - @classmethod - def to_db(cls, fname, date): - cls.download_reports(date) - p = max( - [f for f in get_dir(date).iterdir() if fname in f.name], - key=cls.dtkey_fun(), - default=None, - ) - return p - - @classmethod - def dtkey_fun(cls): - def dtkey_fun(f): - return datetime.datetime.strptime( - f.name.removesuffix(".csv").removesuffix(".xlsx").rsplit("_")[-1], - cls.dtkey, - ) - - return dtkey_fun - - -class BowdstWire(Wire, BNY, fund="BOWDST", dtkey="%Y%m%d%H%M%S"): - @classmethod - def from_report_line(cls, line: dict): - return cls( - date=line["Report Run Date"], - entry_date=line["Cash Entry Date"], - value_date=line["Cash Value Date"], - pay_date=line["Settle / Pay Date"], - currency=line["Local Currency Code"], - amount=line["Local Amount"], - wire_details=line["Transaction Description 1"] - if line["Transaction Type Code"] == "CW" - else line["Transaction Description 2"], - unique_ref=line["Reference Number"], - ) - - @classmethod - def to_db(cls, date): - p = super().to_db("BowdstWires", date) - with open(p) as fh: - reader = DictReader(fh) - for line in reader: - cls.from_report_line(line).stage() - cls.commit() - - -class NTWire(Wire, NT, fund="ISOSEL", dtkey="%Y%m%d%H%M"): - @classmethod - def from_passport_line(cls, line: dict): - return cls( - date=line["Through date"], - entry_date=line["D-GL-POST"], - value_date=line["D-TRAN-EFF"], - pay_date=line["D-TRAN-EFF"], - currency=_nt_to_currency[line["N-GL-AC30"]], - amount=line["Net amount - local"], - wire_details=line["narrative"], - unique_ref=line["C-EXTL-SYS-TRN-DSC-3"], - ) - - @classmethod - def to_db(cls, date): - p = super().to_db("custodian_wires", date) - with open(p) as fh: - reader = DictReader(fh) - for line in reader: - if "sponsor" in line["narrative"].lower(): - cls.from_preport_line(line).stage() - cls.commit() |
