import csv from serenitas.utils.db import dbconn, dawn_engine import datetime from serenitas.utils.misc import rename_keys import pandas as pd from sqlalchemy.exc import IntegrityError from io import StringIO from serenitas.utils.env import DAILY_DIR from serenitas.utils.remote import SftpClient from csv_headers.bond_upload import BBH_BONDS as headers def _include_headers_only(obj, headers): new_obj = {} for header in headers: new_obj[header] = obj.get(header, None) new_obj["tradeid"] = obj.get("tradeid") return new_obj def _serialize(obj): rename_keys( obj, { "dealid": "Client Reference Number", "identifier": "Security ID", "accrued_payment": "Interest Amount", "dtc_number": "Trading Broker Type/ID", "principal_payment": "Principal Amount", "faceamount": "Unit / Original Face Amount", "current_face": "Current Face/Amortize Value", "price": "Unit Price Amount", "net_amount": "Net Amount", }, ) trade_details = { "Trade Date": obj["trade_date"].strftime("%m/%d/%Y"), "Settlement Date": obj["settle_date"].strftime("%m/%d/%Y"), "Place of Settlement/Country": "DTCYUS33", "Transaction Type": "RVP" if obj["buysell"] else "DVP", "Function of Instruction": "NEWM", "Account Number": "4023461", "Currency": "USD", "Clearing Broker ID / Type": obj["Trading Broker Type/ID"], "Other Fees Amount": 0, "Commission Amount": 0, "SEC Fees Amount": 0, } obj.update(trade_details) return _include_headers_only(obj, headers) def process_upload(obj): buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(headers) csvwriter.writerow([obj.get(header, None) for header in headers]) buf = buf.getvalue().encode() fname = f'LMCG_BBH_TRADES_P.{obj["Client Reference Number"].replace("_", "")}.csv' dest = DAILY_DIR / str(datetime.date.today()) / fname sftp = SftpClient.from_creds("bbh") sftp.put(buf, fname) dest.write_bytes(buf) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Upload trades to BBH") parser.add_argument( "date", nargs="?", type=datetime.date.fromisoformat, default=(datetime.date.today() - datetime.timedelta(days=7)), ) args = parser.parse_args() conn = dbconn("dawndb") with conn.cursor() as c: sql_query = "SELECT bond_trades.*, counterparties.dtc_number FROM bond_trades LEFT JOIN counterparties ON cp_code=code WHERE cash_counterparty AND trade_date >= '2022-04-05' AND trade_date >=%s AND fund='BRINKER' and faceamount is not null and faceamount >0;" c.execute( sql_query, (args.date,) ) # We don't want to upload trades before 2022-04-05 so we're filtering on the trade_date twice for row in c: obj = row._asdict() obj = _serialize(obj) df = pd.DataFrame( obj, index=[ "i", ], ) try: df.to_sql( "bbh_bond_upload", dawn_engine, index=False, if_exists="append" ) except IntegrityError: conn.rollback() else: process_upload(obj)