import pandas as pd from serenitas.utils.misc import rename_keys import datetime from collections import defaultdict from io import StringIO import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from psycopg2.errors import UniqueViolation from serenitas.analytics.dates import bus_day from csv_headers.globeop_upload import HEADERS columns = HEADERS["iam"] _brokers = { "BAML_ISDA": "BOANNY", "CS": "CSITLN", "GS": "GOLINY", "BNP": "BNPBNY", "MS": "MSCILN", "JPM": "JPCBNY", "GS_FCM": "GOLDNY", } def iam_process(obj, action, trade_date): rename_keys( obj, { "dealid": "DealId", "action": "Action", "strategy": "Strategy", "counterparty": "Counterparty", "currency": "Currency", "start_money": "StartMoney", "trade_date": "TradeDate", }, ) if action == "UPDATE": obj["ExpirationDate"] = trade_date obj["Action"] = "UPDATE" elif action == "NEW": obj["CallNoticeIndicator"] = "24H" elif action == "CANCEL": obj["Action"] = "CANCEL" if obj["StartMoney"] > 0: obj["TransactionIndicator"] = "DEPOSIT" else: obj["TransactionIndicator"] = "LOAN" obj["StartMoney"] = abs(obj["StartMoney"]) if obj["Strategy"] == "CSH_CASH": obj["Strategy"] = "M_CSH_CASH" # static values obj["DealType"] = "IamDeal" obj["Client"] = "HEDGEMARK" obj["Fund"] = "BOS_PAT_BOWDOIN" obj["Custodian"] = "GS" if obj["Counterparty"] == "GOLDNY" else "BNY" obj["CashAccount"] = ( "057363418ICE-CDS" if obj["Counterparty"] == "GOLDNY" else 751254 ) obj["State"] = "Valid" obj["SettlementDate"] = obj["TradeDate"] obj["Basis"] = "ACT/360" obj["MarginType"] = "Variation" obj["DealFunction"] = "OTC" return [obj.get(h, None) for h in columns] def insert_iam_sql(conn, trade_date): data = [] totals = defaultdict(int) with conn.cursor() as c: strategy_allocation = ( "SELECT broker, amount, currency, strategy FROM (SELECT *, rank() " "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si " "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1 and abs(amount) >= .01;" ) c.execute(strategy_allocation, (trade_date,)) for row in c: data.append( ( trade_date, "NEW", row.strategy, _brokers[row.broker], None, row.amount, row.currency, False, ) ) totals[_brokers[row.broker]] += row.amount for broker, amount in totals.items(): if broker == "GOLDNY": # We don't need to offset Goldman FCM because BONY doesn't book them continue data.append( (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True) ) cancel_trades = [] insert_query = ( """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") """ """VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" ) try: c.executemany(insert_query, data) except UniqueViolation: # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads conn.rollback() c.execute( "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,) ) for row in c: cancel_trades.append(iam_process(row._asdict(), "CANCEL", trade_date)) c.executemany(insert_query, data) return cancel_trades def process_upload(conn, trade_date, cancel_trades, upload=True): csv_lines = cancel_trades actions = { "NEW": ( "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and trade_date =%s " "and action='NEW' and not uploaded returning *" ), "UPDATE": ( "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *" ), } for action, query in actions.items(): with conn.cursor() as c: c.execute(query, (trade_date, trade_date)) for row in c: csv_lines.append(iam_process(row._asdict(), action, trade_date)) buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(columns) csvwriter.writerows(csv_lines) buf = buf.getvalue().encode() dest = ( DAILY_DIR / str(datetime.date.today()) / f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv" ) dest.write_bytes(buf) if upload: sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("incoming") sftp.put(buf, dest.name) if __name__ == "__main__": import argparse from serenitas.utils.db import dbconn conn = dbconn("dawndb") parser = argparse.ArgumentParser(description="Generate IAM file for globeop") parser.add_argument( "date", nargs="?", type=datetime.date.fromisoformat, default=(datetime.date.today() - bus_day).date(), ) args = parser.parse_args() cancel_trades = insert_iam_sql(conn, args.date) process_upload(conn, args.date, cancel_trades, True) conn.commit()