import argparse import csv import datetime import logging import pathlib import re import sys import task_server.config as config from io import StringIO from analytics import CreditIndex from analytics.utils import bus_day from itertools import groupby from pickle import loads from ftplib import FTP from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue from pyisda.date import previous_twentieth from utils.db import dbconn from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from gmail_helpers import GmailMessage from tabulate import tabulate HEADERS_PRE = [ "Deal Type", "Deal Id", "Action", "Client", "Fund", "Portfolio", "Folder", "Custodian", "Cash Account", "Counterparty", "Comments", "State", "Trade Date", ] HEADERS = { "bond": HEADERS_PRE + [ "Settlement Date", "Reserved", "GlopeOp Security Identifier", "CUSIP", "ISIN", "Reserved", "Reserved", "Reserved", "Security Description", "Transaction Indicator", "SubTransaction Indicator", "Accrued", "Price", "BlockId", "BlockAmount", "Reserved", "Reserved", "Reserved", "Reserved", "ClientReference", "ClearingMode", "FaceAmount", "Pool Factor", "FactorAsOfDate", "Delivery", ], "cds": HEADERS_PRE + [ "Reserved", "Reserved", "EffectiveDate", "MaturityDate", "Currency", "Notional", "FixedRate", "PaymentRollDateConvention", "DayCount", "PaymentFrequency", "FirstCouponRate", "FirstCouponDate", "ResetLag", "Liquidation", "LiquidationDate", "Protection", "UnderlyingSecurityId", "UnderlyingSecurityDescription", "CreditSpreadCurve", "CreditEvents", "RecoveryRate", "Settlement", "InitialMargin", "InitialMarginPercentage", "InitialMarginCurrency", "DiscountCurve", "ClientReference", "UpfrontFee", "UpfrontFeePayDate", "RegenerateCashFlow", "UpfrontFeeComment", "Executing Broker", "SwapType", "OnPrice", "OffPrice", "AttachmentPoint", "ExhaustionPoint", "Fees", "Fee Payment Dates", "Fee Comments", "Credit Event Occurred", "Calendar", "Clearing Facility", "Adjusted", "CcpTradeRef", "BlockId", "BlockAmount", "NettingId", "AnnouncementDate", "ExecTS", "DefaultProbability", "ClientMargin", "Factor", "ISDADefinition", ], "swaption": HEADERS_PRE + [ "Reserved", "Reserved", "Reserved", "Notional", "PremiumSettlementDate", "ExpirationDate", "PremiumCurrency", "PercentageOfPremium", "ExerciseType", "Reserved", "SettlementMode", "SettlementRate", "Transaction Indicator", "InitialMargin", "InitialMarginPercentage", "InitialMarginCurrency", "ReceiveLegRateType", "ReceiveFloatRate", "ReceiveFirstCouponDate", "ReceiveFirstCouponRate", "ReceiveFixedRate", "ReceiveDaycount", "ReceiveFrequency", "ReceivePaymentRollConvention", "ReceiveEffectiveDate", "ReceiveMaturityDate", "ReceiveNotional", "ReceiveArrears", "ReceiveAdjusted", "ReceiveCompound", "ReceiveCurrency", "PayLegRateType", "PayFloatRate", "PayFirstCouponDate", "PayFirstCouponRate", "PayFixedRate", "PayDaycount", "PayFrequency", "PayPaymentRollConvention", "PayEffectiveDate", "PayMaturityDate", "PayNotional", "PayArrears", "PayAdjusted", "PayCompound", "PayCurrency", "RegenerateCashFlow", "GiveUpBroker", "ClientReference", "ReceiveDiscountCurve", "ReceiveForwardCurve", "PayDiscountCurve", "PayForwardCurve", "ReceiveFixingFrequency", "ReceiveInterestCalcMethod", "ReceiveCompoundAverageFrequency", "PayFixingFrequency", "PayInterestCalcMethod", "PayCompoundAverageFrequency", "SwapType", "AttachmentPoint", "ExhaustionPoint", "UnderlyingInstrument", "AssociatedDealType", "AssociatedDealId", "CounterpartyReference", "PremiumSettlementCurrency", "PremiumSettlementAmount", "ReceiveIMM Period", "PayIMMPeriod", "Reserved", "ClearingFacility", "Strike", "CcpTradeRef", "BreakClauseFrequency", "BlockId", "BlockAmount", "Cross Currency Premium Payment", "Premium Payment Amount", "Netting Id", "BreakClauseDate", ], "future": HEADERS_PRE + [ "Settlement Date", "Reserved", "GlopeOp Security Identifier", "Reserved", "Reserved", "Reserved", "Bloomberg Ticker", "RIC", "Security Description", "Transaction Indicator", "SubTransaction Indicator", "Quantity", "Price", "Commission", "Tax", "VAT", "Trade Currency", "Reserved", "Reserved", "Broker Short Name", "MaturityDate", "Exchange", "Client Reference", "Swap Type", "Initial Margin", "Initial Margin Currency", "Future Event", "Commission Entries", "BlockId", "Block Amount", ], "wire": HEADERS_PRE + [ "Settlement Date", "Reserved", "Reserved", "Currency", "Amount", "Associated Deal Type", "Associated Deal Id", "Transaction Type", "Instrument Type", "Yield", "Client Reference", "ClearingFacility", "Deal Function", "Reset Price", "Reset Date", "Ccp Trade Ref", "Margin Type", "Block Id", "Block Amount", ], "spot": HEADERS_PRE + [ "Settlement Date", "Dealt Currency", "Spot Rate", "Reserved", "Buy Currency", "Buy Amount", "Sell Currency", "Sell Amount", "ClearingFees", "BlockId", "BlockAmount", "Commission Currency", "Commission", "Reserved", "AssociatedDealType", "AssociatedDealId", "BrokerShortName", "ClientReference", ], } def get_effective_date(d, swaption_type): if swaption_type == "CD_INDEX_OPTION": return previous_twentieth(d + datetime.timedelta(days=1)) else: cal = UnitedStates() return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) def get_trades(q, trade_type="bond", fund="SERCGMAST"): queue_name = f"{trade_type}_{fund}" r = q.lrange(queue_name, 0, -1) df = [loads(e) for e in r] list_trades = [] if df: for tradeid, v in groupby(df, lambda x: x["id"]): trades = list(v) trades = sorted(trades, key=lambda x: x["lastupdate"]) if len(trades) == 1: list_trades.append(trades[0]) else: if trades[-1]["action"] == "CANCEL": continue if trades[0]["action"] == "NEW": trades[-1]["action"] = "NEW" list_trades.append(trades[-1]) if trades[-1]["action"] == "UPDATE": list_trades.append(trades[-1]) return list_trades def rename_keys(d, mapping): """ rename keys in dictionary according to mapping dict inplace""" for k, v in mapping.items(): if k in d: d[v] = d.pop(k) def build_termination( base_dir, dawndb, dealid, fee, *, termination_date=datetime.date.today(), termination_amount=None, termination_cp=None, deal_type="CreditDefaultSwapDeal", ): """ if termination_amount is None, assume full termination if termination_cp is None assume termination, otherwise assignment """ if deal_type == "CreditDefaultSwapDeal": table = "cds" elif deal_type == "SwaptionDeal": table = "swaptions" else: raise ValueError("Unkown deal_type: {deal_type}") with dawndb.cursor() as c: c.execute( "SELECT dealid, cp_code, notional, termination_amount, globeop_id " f"FROM {table} where id=%s", (dealid,), ) dealid, cp_code, notional, partial_amounts, globeopid = c.fetchone() if partial_amounts is not None: remaining_notional = notional - sum(partial_amounts) else: remaining_notional = notional termination_amount = termination_amount or remaining_notional if deal_type == "CreditDefaultSwapDeal": c.execute( f"UPDATE {table} " "SET termination_amount=termination_amount||%s::float8, " "termination_cp=termination_cp||%s::text, " "termination_fee=termination_fee||%s::float8, " "termination_date=termination_date||%s::date " "WHERE dealid=%s", ( termination_amount, termination_cp or cp_code, fee, termination_date, dealid, ), ) else: c.execute( f"UPDATE {table} " "SET termination_amount=%s::float8, " "termination_cp=%s::text, " "termination_fee=%s::float8, " "termination_date=%s::date " "WHERE dealid=%s", ( termination_amount, termination_cp or cp_code, fee, termination_date, dealid, ), ) dawndb.commit() headers = [ "DealType", "DealId", "Action", "Client", "SubAction", "PartialTermination", "TerminationAmount", "TerminationDate", "FeesPaid", "FeesReceived", "DealFunction", "Reserved", "ClientReference", ] if deal_type == "CreditDefaultSwapDeal": headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"] else: headers += ["Reserved"] * 3 headers += ["FeePaymentDate", "SpecialInstructions"] if termination_cp is not None: headers += ["AssignedCounterparty"] else: headers += ["Reserved"] if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None: headers += [ "AssignmentFee", "AssignedFeeTradeDate", "AssignedFeeValueDate", "AssignedCustodian", "AssignedCashAccount", "Reserved", "FeeCurrency", ] else: headers += ["Reserved"] * 7 headers += ["GoTradeId"] if deal_type == "CreditDefaultSwapDeal": headers += ["FeeComments", "ZeroOutInterestCashFlows"] else: headers += ["Reserved"] * 2 headers += ["Reserved"] * 4 if deal_type == "SwaptionDeal": headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"] elif deal_type == "CreditDefaultSwapDeal": if termination_cp is None: headers += ["Reserved"] * 3 else: headers += ["AssignedDealFunction"] + ["Reserved"] * 2 headers += ["InitialMargin", "InitialMarginCurrency"] if termination_cp is None: headers += ["Reserved"] * 4 + ["CreditEventOccured"] d = { "DealType": deal_type, "GoTradeId": int(globeopid[3:9]), "Action": "Update", "Client": "Serenitas", "SubAction": "Termination", "PartialTermination": "Y" if remaining_notional - termination_amount > 0 else "N", "TerminationAmount": termination_amount, "TerminationDate": termination_date, "FeesPaid": -fee if fee < 0 else None, "FeesReceived": fee if fee > 0 else None, "FeePaymentDate": (termination_date + 3 * bus_day).date(), } if "FeeCurrency" in headers: d["FeeCurrency"] = "USD" if termination_cp is not None: d["AssignedCounterparty"] = termination_cp buf = StringIO() csvwriter = csv.DictWriter(buf, headers) csvwriter.writeheader() csvwriter.writerow(d) timestamp = datetime.datetime.now() trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T" file_path = ( base_dir / str(timestamp.date()) / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv" ) file_path.write_bytes(buf.getvalue().encode()) return file_path def build_line(obj, trade_type="bond"): obj["Client"] = "Serenitas" obj["State"] = "Valid" rename_cols = { "fund": "Fund", "action": "Action", "dealid": "Deal Id", "folder": "Folder", "custodian": "Custodian", "cashaccount": "Cash Account", "cp_code": "Counterparty", "identifier": "GlopeOp Security Identifier", "cusip": "CUSIP", "isin": "ISIN", "description": "Security Description", "accrued": "Accrued", "price": "Price", "faceamount": "FaceAmount", "trade_date": "Trade Date", "settle_date": "Settlement Date", "effective_date": "EffectiveDate", "maturity": "MaturityDate", "currency": "Currency", "curr_notional": "Notional", "fixed_rate": "FixedRate", "payment_rolldate": "PaymentRollDateConvention", "day_count": "DayCount", "protection": "Protection", "security_id": "UnderlyingSecurityId", "security_desc": "UnderlyingSecurityDescription", "upfront": "UpfrontFee", "upfront_settle_date": "UpfrontFeePayDate", "swap_type": "SwapType", "orig_attach": "AttachmentPoint", "orig_detach": "ExhaustionPoint", "clearing_facility": "Clearing Facility", "isda_definition": "ISDADefinition", "expiration_date": "ExpirationDate", "portfolio": "Portfolio", "settlement_type": "SettlementMode", } rename_keys(obj, rename_cols) if trade_type in ["bond", "swaption", "future"]: obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell" if trade_type == "bond": obj["Deal Type"] = "MortgageDeal" obj["Portfolio"] = "MORTGAGES" obj["Delivery"] = "S" # zero coupon bond if obj["CUSIP"] != obj["GlopeOp Security Identifier"]: obj["CUSIP"] = None elif trade_type == "swaption": obj["Deal Type"] = "SwaptionDeal" obj["ExerciseType"] = "European" rename_keys( obj, { "Settlement Date": "PremiumSettlementDate", "notional": "Notional", "initial_margin_percentage": "InitialMarginPercentage", }, ) obj["PremiumSettlementAmount"] = ( obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01 ) obj["PremiumSettlementCurrency"] = obj["Currency"] obj["RegenerateCashFlow"] = "N" for direction in ["Pay", "Receive"]: obj[direction + "MaturityDate"] = obj["MaturityDate"] obj[direction + "Currency"] = obj["Currency"] obj[direction + "Notional"] = obj["Notional"] obj[direction + "EffectiveDate"] = get_effective_date( obj["Trade Date"], obj["SwapType"] ) if obj["SwapType"] == "CD_INDEX_OPTION": for direction in ["Pay", "Receive"]: obj[direction + "Daycount"] = "ACT/360" obj[direction + "Frequency"] = "Quarterly" obj[direction + "PaymentRollConvention"] = "Following" for leg_type in ["Receive", "Pay"]: obj[leg_type + "LegRateType"] = "Fixed" if obj["option_type"] == "PAYER": obj["ReceiveFixedRate"] = 0.0 obj["PayFixedRate"] = obj["FixedRate"] elif obj["option_type"] == "RECEIVER": obj["PayFixedRate"] = 0.0 obj["ReceiveFixedRate"] = obj["FixedRate"] elif obj["SwapType"] == "SWAPTION": for direction in ["Pay", "Receive"]: obj[direction + "PaymentRollConvention"] = "ModifiedFollowing" if (obj["buysell"] and obj["option_type"] == "RECEIVER") or ( not obj["buysell"] and obj["option_type"] == "PAYER" ): obj["ReceiveFrequency"] = "Half-Yearly" obj["ReceiveDaycount"] = "30/360" obj["PayFrequency"] = "Quarterly" obj["PayDaycount"] = "ACT/360" obj["ReceiveFixedRate"] = obj["strike"] obj["ReceiveLegRateType"] = "Fixed" obj["PayLegRateType"] = "Float" obj["PayFloatRate"] = "US0003M" else: obj["ReceiveFrequency"] = "Quarterly" obj["ReceiveDaycount"] = "ACT/360" obj["PayFrequency"] = "Half-Yearly" obj["PayDaycount"] = "30/360" obj["ReceiveFloatRate"] = "US0003M" obj["ReceiveLegRateType"] = "Float" obj["PayLegRateType"] = "Fixed" obj["PayFixedRate"] = obj["strike"] else: raise ValueError( "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'" ) obj["PremiumCurrency"] = obj["Currency"] if obj["InitialMarginPercentage"]: obj["InitialMarginCurrency"] = obj["Currency"] obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId") if obj["SwapType"] == "CD_INDEX_OPTION": obj["Strike"] = obj.pop("strike") elif trade_type == "cds": freq = {4: "Quarterly", 12: "Monthly"} obj["Deal Type"] = "CreditDefaultSwapDeal" obj["PaymentFrequency"] = freq[obj["frequency"]] obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage") if obj["InitialMarginPercentage"]: obj["InitialMarginCurrency"] = obj["Currency"] if obj.get("AttachmentPoint") is None: obj["Executing Broker"] = obj["Counterparty"] if obj["account_code"] == "BAML": obj["Counterparty"] = "BAMSNY" obj["Custodian"] = "BOMLCM" elif obj["account_code"] == "WF": obj["Counterparty"] = "WELFEI" obj["Custodian"] = "WELLSFCM" if obj["Clearing Facility"] is None: obj["Clearing Facility"] = "NOT CLEARED" elif trade_type == "future": obj["Deal Type"] = "FutureDeal" rename_keys( obj, { "currency": "Trade Currency", "commission": "Commission", "quantity": "Quantity", "swap_type": "Swap Type", "bbg_ticker": "Bloomberg Ticker", "Currency": "Trade Currency", "exchange": "Exchange", }, ) elif trade_type == "wire": obj["Deal Type"] = "CashFlowDeal" obj["Transaction Type"] = "Transfer" obj["Instrument Type"] = "Cashflow" obj["Settlement Date"] = obj["Trade Date"] rename_keys(obj, {"amount": "Amount"}) elif trade_type == "spot": obj["Deal Type"] = "SpotDeal" rename_keys( obj, { "commission": "Commission", "commission_currency": "Commission Currency", "sell_currency": "Sell Currency", "sell_amount": "Sell Amount", "buy_currency": "Buy Currency", "buy_amount": "Buy Amount", "spot_rate": "Spot Rate", }, ) return [obj.get(h, None) for h in HEADERS[trade_type]] def get_bbg_data( conn, session, identifier, cusip=None, isin=None, settle_date=None, asset_class=None, **kwargs, ): fields = ["MTG_FACTOR_SET_DT", "INT_ACC"] fields_dict = { "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], } with conn.cursor() as c: c.execute( "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) ) if not c.fetchone(): fields += [ "MATURITY", "CRNCY", "NAME", "FLOATER", "FLT_SPREAD", "CPN", "CPN_FREQ", "FIRST_CPN_DT", "MTG_PAY_DELAY", "DAY_CNT_DES", ] cusip_or_isin = cusip or isin for bbg_type in ["Mtge", "Corp"]: bbg_id = cusip_or_isin + " " + bbg_type data = retrieve_data( session, [bbg_id], fields + fields_dict[bbg_type], overrides={"SETTLE_DT": settle_date} if settle_date else None, ) if data[bbg_id]: break else: logging.error(f"{cusip_or_isin} not in bloomberg") return bbg_data = data[bbg_id] if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: bbg_data["MTG_FACTOR_SET_DT"] = 1 bbg_data["INT_ACC"] = 0 if len(fields) > 2: # we don't have the data in the securities table sql_fields = [ "identifier", "cusip", "isin", "description", "face_amount", "maturity", "floater", "spread", "coupon", "frequency", "day_count", "first_coupon_date", "pay_delay", "currency", "bbg_type", "asset_class", "start_accrued_date", ] placeholders = ",".join(["%s"] * len(sql_fields)) columns = ",".join(sql_fields) sqlstr = f"INSERT INTO securities({columns}) VALUES({placeholders})" isfloater = bbg_data["FLOATER"] == "Y" pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) day_count = bbg_data.get("DAY_CNT_DES") m = re.match("[^(\s]+", day_count) if m: day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(" ")[0]) with conn.cursor() as c: c.execute( sqlstr, ( identifier, cusip, isin, bbg_data["NAME"], bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), bbg_data.get("MATURITY"), isfloater, bbg_data.get("FLT_SPREAD") if isfloater else None, bbg_data.get("CPN") if not isfloater else None, bbg_data.get("CPN_FREQ"), day_count, bbg_data.get("FIRST_CPN_DT"), pay_delay, bbg_data.get("CRNCY"), bbg_type, asset_class, bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), ), ) conn.commit() return bbg_data def bond_trade_process(conn, session, trade): bbg_data = get_bbg_data(conn, session, **trade) currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0 principal_payment = currentface * trade["price"] / 100.0 with conn: with conn.cursor() as c: c.execute( "UPDATE bonds SET principal_payment = %s, accrued_payment = %s " "WHERE id = %s", (principal_payment, accrued_payment, int(trade["id"])), ) # mark it at buy price if trade["buysell"]: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" with conn: with conn.cursor() as c: c.execute( sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) ) def send_email(trade): # send out email with trade content email = GmailMessage() email.set_content(print_trade(trade)) email["To"] = "nyops@lmcg.com" email["Subject"] = email_subject(trade) email.send() def is_tranche_trade(trade): return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") def swaption_trade_process(conn, session, trade): sqlstr = ( "SELECT indexfactor/100 FROM index_version " "WHERE redindexcode=%(security_id)s" ) try: with conn.cursor() as c: c.execute(sqlstr, trade) try: factor, = c.fetchone() except TypeError: return except ValueError: return else: trade["factor"] = factor def cds_trade_process(conn, session, trade): sqlstr = ( "SELECT indexfactor/100 FROM index_version " "WHERE redindexcode=%(security_id)s" ) try: with conn.cursor() as c: c.execute(sqlstr, trade) factor, = c.fetchone() except ValueError: bbg_data = get_bbg_data( conn, session, trade["security_id"], isin=trade["security_id"], asset_class="Subprime", ) factor = bbg_data["MTG_FACTOR_SET_DT"] if is_tranche_trade(trade): tranche_factor = (trade["attach"] - trade["detach"]) / ( trade["orig_attach"] - trade["orig_detach"] ) trade["curr_notional"] = trade["notional"] * tranche_factor else: trade["curr_notional"] = trade["notional"] * factor if not is_tranche_trade(trade): index = CreditIndex( redcode=trade["security_id"], maturity=trade["maturity"], notional=trade["notional"], value_date=trade["trade_date"], ) index.direction = trade["protection"] with conn.cursor() as c: if trade["ref"]: index.ref = trade["ref"] trade["upfront"] = -index.pv c.execute( "UPDATE cds SET upfront=%s WHERE dealid=%s", (trade["upfront"], trade["dealid"]), ) else: index.pv = trade["upfront"] trade["ref"] = index.ref c.execute( "UPDATE cds SET ref=%s WHERE dealid=%s", (trade["ref"], trade["dealid"]), ) conn.commit() return trade def generate_csv(l, trade_type="bond", fund="SERCGMAST"): output = StringIO() csvwriter = csv.writer(output) csvwriter.writerow(HEADERS[trade_type]) empty = True for trade in l: empty = False csvwriter.writerow(build_line(trade.copy(), trade_type)) if empty: raise IOError("empty trade queue") else: return output.getvalue().encode() def get_filepath( base_dir: pathlib.Path, trade_type: str, fund: str = "SERCGMAST" ) -> pathlib.Path: d = { "bond": "Mortgages", "cds": "CreditDefaultSwapDeal", "swaption": "SwaptionDeal", "future": "Future", "wire": "CashFlowDeal", "spot": "SpotDeal", "capfloor": "TODO", } timestamp = datetime.datetime.now() if fund == "BRINKER": return ( base_dir / str(timestamp.date()) / f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv" ) else: return ( base_dir / str(timestamp.date()) / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv" ) def upload_file(file_path: pathlib.Path) -> None: if "BBH" in file_path.name: return ftp = FTP("ftp.globeop.com") ftp.login("srntsftp", config.ftp_password) ftp.cwd("incoming") cmd = f"STOR {file_path.name}" with file_path.open("rb") as fh: ftp.storbinary(cmd, fh) def write_buffer( buf: bytes, base_dir: pathlib.Path, trade_type: str = "bond", fund: str = "SERCGMAST", ): file_path = get_filepath(base_dir, trade_type, fund) file_path.write_bytes(buf) return file_path def email_subject(trade): return "[{0}] {1} {2} {3}".format( trade["asset_class"], trade["action"], "Buy" if trade["buysell"] else "Sell", trade["description"], ) def print_trade(trade): d = trade.copy() d["buysell"] = "Buy" if d["buysell"] else "Sell" return tabulate((k, v) for k, v in d.items()) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-n", "--no-upload", action="store_true", help="do not upload to Globeop" ) parser.add_argument("fund", nargs="?", default="SERCGMAST") args = parser.parse_args() q = get_redis_queue() try: from env import DAILY_DIR except KeyError: sys.exit("Please set path of daily directory in 'DAILY_DIR'") dawndb = dbconn("dawndb") for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]: list_trades = get_trades(q, trade_type, args.fund) if list_trades: if trade_type in ["bond", "cds", "swaption"]: process_fun = globals()[f"{trade_type}_trade_process"] with init_bbg_session(BBG_IP) as session: for trade in list_trades: process_fun(dawndb, session, trade) if trade_type == "bond" and args.fund == "SERCGMAST": for trade in list_trades: if trade["upload"]: send_email(trade) if args.fund == "SERCGMAST" or trade_type == "cds": try: buf = generate_csv( filter(lambda t: t.get("upload", True), list_trades), trade_type, args.fund, ) file_path = write_buffer(buf, DAILY_DIR, trade_type, args.fund) if not args.no_upload: upload_file(file_path) except IOError: pass q.delete(f"{trade_type}_{args.fund}") dawndb.close()