from serenitas.utils.db import dbconn from io import StringIO import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from serenitas.analytics.dates import next_business_day import datetime from trade_dataclasses import CDSDeal, SwaptionDeal from headers import MTM_HEADERS def rename_keys(d, mapping): """rename keys in dictionary according to mapping dict inplace""" for k, v in mapping.items(): if k in d: d[v] = d.pop(k) def tranche_trades(tradeids, conn): trades = [] for tradeid in tradeids: obj = CDSDeal.from_tradeid(tradeid).to_markit() trades.append(obj) return trades def swaption_trades(tradeids, conn): trades = [] for tradeid in tradeids: obj = SwaptionDeal.from_tradeid(tradeid).to_markit() trades.append(obj) return trades def tranche_term_trades(conn): with conn.cursor() as c: trades = [] c.execute( "SELECT terminations.*, cds.fund, cds.cp_code FROM terminations left join cds using (dealid) where termination_date >= %s and dealid LIKE %s", (datetime.date(2022, 3, 1), "SCCDS%"), ) for row in c: obj = row._asdict() rename_keys( obj, { "dealid": "Swap ID", "termination_cp": "Broker Id", "termination_amount": "1st Leg Notional", "termination_fee": "Initial Payment", "termination_date": "Trade Date", "fee_payment_date": "Settle Date", "fund": "Account Abbreviation", "termination_cp": "Broker Id", "cp_code": "Remaining Party", }, ) if obj["Initial Payment"] >= 0: obj["Transaction Code"] = "Receive" else: obj["Initial Payment"] = abs(obj["Initial Payment"]) obj["Transaction Code"] = "Pay" obj["Currency Code"] = "USD" obj["Product Type"] = "TRN" obj["Entity Matrix"] = "Publisher" obj["Definitions Type"] = "ISDA2003Credit" obj["Trade ID"] = obj["Swap ID"] + "-" + str(obj["id"]) obj["Transaction Type"] = "Partial Assignment" obj["Effective Date"] = obj["Trade Date"] + datetime.timedelta(days=1) trades.append(obj) return trades def terminations(tradeid, conn): with conn.cursor() as c: termination_query = ( """SELECT terminations.*, coalesce(cds.cp_code, swaptions.cp_code) AS orig_cp, COALESCE (cds."currency", swaptions."currency") AS currency, """ """COALESCE (cds."swap_type", 'SWAPTION') as swap_type FROM terminations LEFT JOIN cds USING (dealid) LEFT JOIN swaptions USING (dealid) where terminations.id = %s ORDER BY id desc;""" ) c.execute(termination_query, (tradeid,)) for row in c: obj = row._asdict() rename_keys( obj, { "dealid": "Swap ID", "termination_cp": "Broker Id", "termination_amount": "1st Leg Notional", "termination_fee": "Initial Payment", "termination_date": "Trade Date", "fee_payment_date": "Settle Date", "fund": "Account Abbreviation", "termination_cp": "Broker Id", "orig_cp": "Remaining Party", "currency": "Currency Code", }, ) if obj["Initial Payment"] >= 0: obj["Transaction Code"] = "Receive" else: obj["Initial Payment"] = abs(obj["Initial Payment"]) obj["Transaction Code"] = "Pay" match obj["swap_type"]: case "CD_INDEX_TRANCHE": obj["Product Type"] = "TRN" case "SWAPTION": obj["Product Type"] = "CDISW" case _: print("Not a valid termination") obj["Trade ID"] = obj["Swap ID"] + "-" + str(obj["id"]) obj["Transaction Type"] = ( "Termination" if obj["Remaining Party"] == obj["Broker Id"] else "Assignment" ) obj["Effective Date"] = obj["Trade Date"] + datetime.timedelta(days=1) trades.append(obj) def build_line(obj, asset_type): return [obj.get(h, None) for h in MTM_HEADERS[asset_type]] def process_upload(trades, asset_type, upload): buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(MTM_HEADERS[asset_type]) csvwriter.writerows(build_line(trade, asset_type) for trade in trades) buf = buf.getvalue().encode() fname = f"MTM.{datetime.datetime.now():%Y%m%d.%H%M%S}.{asset_type.capitalize()}.csv" sftp = SftpClient.from_creds("mtm") sftp.put(buf, fname) dest = DAILY_DIR / str(datetime.date.today()) / fname dest.write_bytes(buf) def upload_mtm_trades(trade_type, tradeid, conn=None): match trade_type: case "swaption": process_upload( SwaptionDeal.from_tradeid(tradeid).to_markit(), trade_type, True ) case "cds": process_upload(CDSDeal.from_tradeid(tradeid).to_markit(), trade_type, True) case "termination": process_upload(terminations(tradeid, conn), trade_type, True) if __name__ == "__main__": conn = dbconn("dawndb") upload_trades(conn)