import pandas as pd from process_queue import rename_keys import datetime from collections import defaultdict from io import StringIO import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from psycopg2.errors import UniqueViolation from serenitas.analytics.dates import bus_day columns = [ "DealType", "DealId", "Action", "Client", "Fund", "Portfolio/Business Unit", "Strategy", "Custodian", "CashAccount", "Counterparty", "Comments", "State", "TradeDate", "SettlementDate", "Reserved", "InstrumentType", "ExpirationDate", "CallNoticeIndicator", "TransactionIndicator", "StartMoney", "Currency", "Rate", "Commission", "DealFunction", "FromAccount", "ClientReference", "Basis", "MarginType", "ClearingFacility" "CcpTradeRef", "BlockId", "BlockAmount", "ExecutionDateTimeStamp", "Collateralized", "TradeDateFX", ] _brokers = { "BAML_ISDA": "BOANNY", "CS": "CSITLN", "GS": "GOLINY", "BNP": "BNPBNY", "MS": "MSCILN", "JPM": "JPCBNY", "GS_FCM": "GOLDNY", } def iam_process(obj, action, trade_date): rename_keys( obj, { "dealid": "DealId", "action": "Action", "strategy": "Strategy", "counterparty": "Counterparty", "currency": "Currency", "start_money": "StartMoney", "trade_date": "TradeDate", }, ) if action == "UPDATE": obj["ExpirationDate"] = trade_date obj["Action"] = "UPDATE" elif action == "NEW": obj["CallNoticeIndicator"] = "24H" elif action == "CANCEL": obj["Action"] = "CANCEL" if obj["StartMoney"] > 0: obj["TransactionIndicator"] = "DEPOSIT" else: obj["TransactionIndicator"] = "LOAN" obj["StartMoney"] = abs(obj["StartMoney"]) if obj["Strategy"] == "CSH_CASH": obj["Strategy"] = "M_CSH_CASH" # static values obj["DealType"] = "IamDeal" obj["Client"] = "HEDGEMARK" obj["Fund"] = "BOS_PAT_BOWDOIN" obj["Custodian"] = "GS" if obj["Counterparty"] == "GOLDNY" else "BNY" obj["CashAccount"] = ( "057363418ICE-CDS" if obj["Counterparty"] == "GOLDNY" else 751254 ) obj["State"] = "Valid" obj["SettlementDate"] = obj["TradeDate"] obj["Basis"] = "ACT/360" obj["MarginType"] = "Variation" return [obj.get(h, None) for h in columns] def insert_iam_sql(conn, trade_date): data = [] totals = defaultdict(int) with conn.cursor() as c: strategy_allocation = ( "SELECT broker, amount, currency, strategy FROM (SELECT *, rank() " "OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si " "WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1 and abs(amount) >= .01;" ) c.execute(strategy_allocation, (trade_date,)) for row in c: data.append( ( trade_date, "NEW", row.strategy, _brokers[row.broker], None, row.amount, row.currency, False, ) ) totals[_brokers[row.broker]] += row.amount for broker, amount in totals.items(): if broker == "GOLDNY": # We don't need to offset Goldman FCM because BONY doesn't book them continue data.append( (trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True) ) cancel_trades = [] insert_query = ( """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") """ """VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" ) try: c.executemany(insert_query, data) except UniqueViolation: # We already uploaded the IAM tickets today in that case, we need to update and cancel the old uploads conn.rollback() c.execute( "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,) ) for row in c: cancel_trades.append(iam_process(row._asdict(), "CANCEL", trade_date)) c.executemany(insert_query, data) return cancel_trades def process_upload(conn, trade_date, cancel_trades, upload=True): csv_lines = cancel_trades actions = { "NEW": ( "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and trade_date =%s " "and action='NEW' and not uploaded returning *" ), "UPDATE": ( "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *" ), } for action, query in actions.items(): with conn.cursor() as c: c.execute(query, (trade_date, trade_date)) for row in c: csv_lines.append(iam_process(row._asdict(), action, trade_date)) buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(columns) csvwriter.writerows(csv_lines) buf = buf.getvalue().encode() dest = ( DAILY_DIR / str(datetime.date.today()) / f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv" ) dest.write_bytes(buf) if upload: sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("incoming") sftp.put(buf, dest.name) if __name__ == "__main__": import argparse from serenitas.utils.db import dbconn conn = dbconn("dawndb") parser = argparse.ArgumentParser(description="Generate IAM file for globeop") parser.add_argument( "date", nargs="?", type=datetime.date.fromisoformat, default=(datetime.date.today() - bus_day).date(), ) args = parser.parse_args() cancel_trades = insert_iam_sql(conn, args.date) process_upload(conn, args.date, cancel_trades, True) conn.commit()