import pandas as pd from serenitas.utils.db import dbconn from process_queue import rename_keys import datetime from collections import defaultdict from io import StringIO import csv from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient columns = [ "DealType", "DealId", "Action", "Client", "Fund", "Portfolio/Business Unit", "Strategy", "Custodian", "CashAccount", "Counterparty", "Comments", "State", "TradeDate", "SettlementDate", "Reserved", "InstrumentType", "ExpirationDate", "CallNoticeIndicator", "TransactionIndicator", "StartMoney", "Currency", "Rate", "Commission", "DealFunction", "FromAccount", "ClientReference", "Basis", "MarginType", "ClearingFacility" "CcpTradeRef", "BlockId", "BlockAmount", "ExecutionDateTimeStamp", "Collateralized", "TradeDateFX", ] _brokers = { "BAML_ISDA": "BOANNY", "CS": "CSITLN", "GS": "GOLINY", "BNP": "BNPBNY", "MS": "MSCILN", "JPM": "JPCBNY", } insert_query = """INSERT INTO iam_tickets(trade_date, action, strategy, counterparty, maturity, start_money, currency, "offset") VALUES (%s, %s, %s, %s, %s, %s, %s, %s);""" sql_query = "SELECT broker, amount, strategy FROM (SELECT *, rank() OVER(PARTITION BY broker,fund ORDER BY date desc) FROM strategy_im si WHERE fund = 'BOWDST' AND date<=%s and broker != 'GS_FCM' ORDER BY date DESC) test WHERE RANK=1;" select_query = "UPDATE iam_tickets set uploaded=True where maturity is null and trade_date =%s and not uploaded returning *" cancel_query = "UPDATE iam_tickets set maturity=%s where trade_date != %s and maturity is null returning *" conn = dbconn("dawndb") def new_iam_process(obj, cancel=False): rename_keys( obj, { "dealid": "DealId", "action": "Action", "strategy": "Strategy", "counterparty": "Counterparty", "currency": "Currency", "start_money": "StartMoney", "trade_date": "TradeDate", }, ) if cancel: obj["ExpirationDate"] = trade_date obj["Action"] = "UPDATE" else: obj["CallNoticeIndicator"] = "24H" if obj["StartMoney"] > 0: obj["TransactionIndicator"] = "DEPOSIT" else: obj["TransactionIndicator"] = "LOAN" obj["StartMoney"] = abs(obj["StartMoney"]) if obj["Strategy"] == "CSH_CASH": obj["Strategy"] = "M_CSH_CASH" # static values obj["DealType"] = "IamDeal" obj["Client"] = "HEDGEMARK" obj["Fund"] = "BOS_PAT_BOWDOIN" obj["Custodian"] = "BNY" obj["CashAccount"] = 751254 obj["State"] = "Valid" obj["SettlementDate"] = obj["TradeDate"] obj["Basis"] = "ACT/360" obj["MarginType"] = "Variation" return [obj.get(h, None) for h in columns] with conn.cursor() as c: trade_date = datetime.date(2022, 2, 24) c.execute(sql_query, (trade_date,)) data = [] offsets = defaultdict(int) for row in c: data.append( ( trade_date, "NEW", row.strategy, _brokers[row.broker], None, row.amount, "USD", False, ) ) offsets[_brokers[row.broker]] += row.amount for broker, amount in offsets.items(): data.append((trade_date, "NEW", "CSH_CASH", broker, None, -amount, "USD", True)) c.executemany(insert_query, data) csv_data = [] c.execute(select_query, (trade_date,)) for row in c: csv_data.append(new_iam_process(row._asdict(), False)) c.execute(cancel_query, (trade_date, trade_date)) for row in c: csv_data.append(new_iam_process(row._asdict(), True)) buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(columns) csvwriter.writerows(csv_data) buf = buf.getvalue().encode() dest = DAILY_DIR / f"Bowdst.ALL.{datetime.datetime.now():%Y%m%d.%H%M%S}.IamDeal.csv" dest.write_bytes(buf) sftp = SftpClient.from_creds("hm_globeop") sftp.client.chdir("incoming") sftp.put(buf, dest.name) conn.commit()