import pandas as pd from serenitas.utils.db import dbconn from process_queue import rename_keys import datetime from collections import defaultdict from io import StringIO import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from psycopg2.errors import UniqueViolation columns = [ "DealType", "DealId", "Action", "Client", "Fund", "Portfolio/Business Unit", "Strategy", "Custodian", "CashAccount", "Counterparty", "Comments", "State", "TradeDate", "SettlementDate", "Reserved", "InstrumentType", "ExpirationDate", "CallNoticeIndicator", "TransactionIndicator", "StartMoney", "Currency", "Rate", "Commission", "DealFunction", "FromAccount", "ClientReference", "Basis", "MarginType", "ClearingFacility" "CcpTradeRef", "BlockId", "BlockAmount", "ExecutionDateTimeStamp", "Collateralized", "TradeDateFX", ] _brokers = { "BAML_ISDA": "BOANNY", "CS": "CSITLN", "GS": "GOLINY", "BNP": "BNPBNY", "MS": "MSCILN", "JPM": "JPCBNY", "GS_FCM": "GOLDNY", } insert_query = """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" sql_query = "SELECT broker, amount, strategy FROM (SELECT *, rank() OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si WHERE fund = 'BOWDST' AND date<=%s ORDER BY date DESC) test WHERE RANK=1;" select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and action='NEW' and not uploaded returning *" cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null and action='NEW' returning *" conn = dbconn("dawndb") def new_iam_process(obj, action): rename_keys( obj, { "dealid": "DealId", "action": "Action", "strategy": "Strategy", "counterparty": "Counterparty", "currency": "Currency", "start_money": "StartMoney", "trade_date": "TradeDate", }, ) if action == "UPDATE": obj["ExpirationDate"] = trade_date obj["Action"] = "UPDATE" elif action == "NEW": obj["CallNoticeIndicator"] = "24H" elif action == "CANCEL": obj["Action"] = "CANCEL" if obj["StartMoney"] > 0: obj["TransactionIndicator"] = "DEPOSIT" else: obj["TransactionIndicator"] = "LOAN" obj["StartMoney"] = abs(obj["StartMoney"]) if obj["Strategy"] == "CSH_CASH": obj["Strategy"] = "M_CSH_CASH" # static values obj["DealType"] = "IamDeal" obj["Client"] = "HEDGEMARK" obj["Fund"] = "BOS_PAT_BOWDOIN" obj["Custodian"] = "BNY" obj["CashAccount"] = 751254 obj["State"] = "Valid" obj["SettlementDate"] = obj["TradeDate"] obj["Basis"] = "ACT/360" obj["MarginType"] = "Variation" return [obj.get(h, None) for h in columns] with conn.cursor() as c: trade_date = datetime.date(2022, 2, 25) c.execute(sql_query, (trade_date,)) data = [] offsets = defaultdict(int) for row in c: data.append( ( trade_date, "NEW", row.strategy, _brokers[row.broker], None, row.amount, "USD", False, ) ) offsets[_brokers[row.broker]] += row.amount for broker, amount in offsets.items(): if broker == "GOLDNY": continue data.append((trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True)) csv_data = [] try: c.executemany(insert_query, data) except UniqueViolation: conn.rollback() c.execute( "DELETE FROM iam_tickets where trade_date=%s returning *", (trade_date,) ) for row in c: csv_data.append(new_iam_process(row._asdict(), "CANCEL")) c.executemany(insert_query, data) c.execute(select_query, (trade_date,)) for row in c: csv_data.append(new_iam_process(row._asdict(), "NEW")) c.execute(cancel_query, (trade_date, trade_date)) for row in c: csv_data.append(new_iam_process(row._asdict(), "UPDATE")) buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(columns) csvwriter.writerows(csv_data) buf = buf.getvalue().encode() dest = ( DAILY_DIR / str(datetime.date.today()) / f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv" ) dest.write_bytes(buf) sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("incoming") sftp.put(buf, dest.name) conn.commit()