import argparse import datetime import logging import sys import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.analytics.dates import prev_business_day from serenitas.utils.db import dawn_engine from serenitas.utils.db import dbconn from collateral.baml_isda import load_excel as load_excel_baml from collateral.gs import load_file as load_excel_gs from report_ops.enums import FundEnum logger = logging.getLogger(__name__) logging.basicConfig(stream=sys.stdout, level=logging.INFO) FX_REPORT_COLUMNS = [ "cpty_id", "trade_date", "settle_date", "buy_currency", "buy_amount", "sell_currency", "sell_amount", ] def get_tag(fund): if fund == "Serenitas": return "TSLP" elif fund == "BowdSt": return "TLLC" elif fund == "Selene": return "INC" else: return "" def read_BAMSNY(fund, workdate): REPORTS_DIR = DAILY_DIR / fund / "BoA_reports" try: fname = next( REPORTS_DIR.glob( f"301__LMCG_INVESTMEN{get_tag(fund)}_CSA_{workdate:%m%d%Y}_*" ) ) except StopIteration: raise FileNotFoundError df = load_excel_baml(fname) df = df[df["ProductID"] == "FX_Fwd"] df = df[ [ "Back Office Number", "Trade Date", "Maturity Date", "Ccy1", "Notional 1", "Ccy2", "Notional 2", ] ] df.columns = FX_REPORT_COLUMNS for date_column in ( "trade_date", "settle_date", ): df[date_column] = pd.to_datetime(df[date_column], infer_datetime_format=True) df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) yield from df.itertuples() def read_MSCSNY(fund, workdate): df = pd.read_excel( DAILY_DIR / fund / "MS_reports" / f"Trade_Detail_FX_{prev_business_day(workdate):%Y%m%d}.xls" ) df = df[ [ "trade_id", "trade_date", "settlement_date", "buy_ccy", "amt_buy_ccy", "sell_ccy", "amt_sell_ccy", ] ] df.columns = FX_REPORT_COLUMNS for date_column in ( "trade_date", "settle_date", ): df[date_column] = pd.to_datetime( df[date_column], infer_datetime_format=True ).dt.date df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) yield from df.itertuples() def extract_amounts_and_currencies(row): cpty_id = row["Trade Id"] trade_date = datetime.datetime.strptime(row["Trade Date"], "%d-%b-%Y").date() settle_date = datetime.datetime.strptime(row["Maturity Date"], "%d-%b-%Y").date() if row["Notional(1)"] < 0: # From perspective of GSIL buy_amount = abs(row["Notional(1)"]) buy_currency = row["Not1Ccy"] sell_amount = abs(row["Notional(2)"]) sell_currency = row["Not2Ccy"] else: buy_amount = abs(row["Notional(2)"]) buy_currency = row["Not2Ccy"] sell_amount = abs(row["Notional(1)"]) sell_currency = row["Not1Ccy"] return { "cpty_id": cpty_id, "trade_date": trade_date, "settle_date": settle_date, "buy_amount": buy_amount, "buy_currency": buy_currency, "sell_amount": sell_amount, "sell_currency": sell_currency, } def read_GOLDNY(fund, workdate): df = load_excel_gs(prev_business_day(workdate), fund, "Trade_Detail") df = df[df["Transaction Type"] == "FX"] result = df.apply(extract_amounts_and_currencies, axis=1) yield from pd.DataFrame.from_records(result).itertuples() def get_forwards(cob): df = pd.read_sql_query("SELECT * FROM forward_trades", con=dawn_engine) df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) return df def process_rows(fund, counterparty, workdate, dawn_trades, conn): read_fun = globals()[f"read_{counterparty}"] for row in read_fun(fund.name, workdate): if row.cpty_id not in dawn_trades.cpty_id.values: matching_candidates = dawn_trades.loc[ (dawn_trades["cp_code"] == counterparty) & (dawn_trades["fund"] == fund.value) ] filters = { "trade_date": row.trade_date, "settle_date": row.settle_date, "buy_currency": row.buy_currency, "sell_currency": row.sell_currency, "buy_amount": row.buy_amount, "sell_amount": row.sell_amount, } matching_candidates = matching_candidates.loc[ (matching_candidates[list(filters.keys())] == pd.Series(filters)).all( axis=1 ) ] if not matching_candidates.empty: matched_candidate = matching_candidates.iloc[0, :] with conn.cursor() as c: if matched_candidate.fx_type == "SPOT": table = "spots" column = "cpty_id" elif matched_candidate.fx_type in ("NEAR", "FAR"): table = "fx_swaps" column = f"{matched_candidate.fx_type.lower()}_cpty_id" c.execute( f"UPDATE {table} SET {column}=%s WHERE id=%s", ( row.cpty_id, matched_candidate.id, ), ) logger.info( f"MATCHED {matched_candidate.id}: {table} to {row.cpty_id}" ) conn.commit() else: raise ValueError(f"This is an unknown trade from us. {row.cpty_id}") else: pass # Will later start verifying that our trades are matched right def main(workdate): conn = dbconn("dawndb") dawn_trades = get_forwards(workdate) for fund in FundEnum: for counterparty in ( "MSCSNY", "BAMSNY", "GOLDNY", ): try: process_rows(fund, counterparty, workdate, dawn_trades, conn) except FileNotFoundError as e: logger.warning(f"FileNotFound for {fund}: {counterparty} on {workdate}") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "workdate", type=datetime.date.fromisoformat, nargs="?", default=datetime.date.today(), help="Workdate (YYYY-MM-DD)", ) args = parser.parse_args() main(args.workdate)