import argparse import datetime import logging import pandas as pd from serenitas.utils.env import DAILY_DIR from serenitas.analytics.dates import prev_business_day from serenitas.utils.db import dawn_engine from serenitas.utils.db import dbconn from collateral.baml_isda import load_excel logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) FX_REPORT_COLUMNS = [ "cpty_id", "trade_date", "settle_date", "buy_currency", "buy_amount", "sell_currency", "sell_amount", ] _fund_enums = {"Serenitas": "SERCGMAST", "BowdSt": "BOWDST", "Selene": "ISOSEL"} def get_tag(fund): if fund == "Serenitas": return "TSLP" elif fund == "BowdSt": return "TLLC" elif fund == "Selene": return "INC" else: return "" def read_BAMSNY(fund, workdate): REPORTS_DIR = DAILY_DIR / fund / "BoA_reports" fname = next( REPORTS_DIR.glob(f"301__LMCG_INVESTMEN{get_tag(fund)}_CSA_{workdate:%m%d%Y}_*") ) df = load_excel(fname) df = df[df["ProductID"] == "FX_Fwd"] df = df[ [ "Back Office Number", "Trade Date", "Maturity Date", "Ccy1", "Notional 1", "Ccy2", "Notional 2", ] ] df.columns = FX_REPORT_COLUMNS for date_column in ( "trade_date", "settle_date", ): df[date_column] = pd.to_datetime(df[date_column], infer_datetime_format=True) df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) yield from df.itertuples() def read_MSCSNY(fund, workdate): df = pd.read_excel( DAILY_DIR / fund / "MS_reports" / f"Trade_Detail_FX_{prev_business_day(workdate):%Y%m%d}.xls" ) df = df[ [ "trade_id", "trade_date", "settlement_date", "buy_ccy", "amt_buy_ccy", "sell_ccy", "amt_sell_ccy", ] ] df.columns = FX_REPORT_COLUMNS for date_column in ( "trade_date", "settle_date", ): df[date_column] = pd.to_datetime(df[date_column], infer_datetime_format=True) df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) yield from df.itertuples() def get_forwards(cob): df = pd.read_sql_query("SELECT * FROM forward_trades", con=dawn_engine) df[["buy_amount", "sell_amount"]] = df[["buy_amount", "sell_amount"]].apply( pd.to_numeric, errors="coerce" ) return df def main(workdate): conn = dbconn("dawndb") dawn_trades = get_forwards(workdate) for fund, fund_code in _fund_enums.items(): for counterparty in ("MSCSNY", "BAMSNY"): read_fun = globals()[f"read_{counterparty}"] for row in read_fun(fund, workdate): if row.cpty_id not in dawn_trades.cpty_id.values: matching_candidates = dawn_trades.loc[ (dawn_trades["cp_code"] == counterparty) & (dawn_trades["fund"] == fund_code) ] filters = { "trade_date": pd.Timestamp(row.trade_date), "settle_date": pd.Timestamp(row.settle_date), "buy_currency": row.buy_currency, "sell_currency": row.sell_currency, "buy_amount": row.buy_amount, "sell_amount": row.sell_amount, } matching_candidates = matching_candidates.loc[ ( matching_candidates[list(filters.keys())] == pd.Series(filters) ).all(axis=1) ] if not matching_candidates.empty: matched_candidate = matching_candidates.iloc[0, :] with conn.cursor() as c: if matched_candidate.fx_type == "SPOT": table = "spots" column = "cpty_id" elif matched_candidate.fx_type in ( "NEAR", "FAR", ): table = "fx_swaps" column = f"{matched_candidate.fx_type.lower()}_cpty_id" c.execute( f"UPDATE {table} SET {column}=%s WHERE id=%s", ( row.cpty_id, matched_candidate.id, ), ) logger.info("UPDATED %", matched_candidate.id) conn.commit() else: raise ValueError( f"This is an unknown trade from us. {row.cpty_id}" ) else: pass # Will later start verifying that our trades are matched right if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "workdate", type=datetime.date, nargs="?", default=datetime.date.today(), help="Date to process (YYYY-MM-DD)", ) args = parser.parse_args() main(args.workdate)