diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 824 |
1 files changed, 0 insertions, 824 deletions
diff --git a/python/process_queue.py b/python/process_queue.py deleted file mode 100644 index 3141aa5f..00000000 --- a/python/process_queue.py +++ /dev/null @@ -1,824 +0,0 @@ -import argparse -import blpapi -import csv -import datetime -import logging -import psycopg -import pathlib -import re -import redis -import sys - -from io import StringIO - -from serenitas.analytics.api import CreditIndex - -try: - from serenitas.utils.env import DAILY_DIR -except KeyError: - sys.exit("Please set path of daily directory in 'SERENITAS_DAILY_DIR'") - -from collections import defaultdict -from pickle import dumps, loads -from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data -from serenitas.analytics.dates import bus_day -from serenitas.utils.exchange import ExchangeMessage, FileAttachment -from serenitas.utils.remote import FtpClient, SftpClient -from serenitas.utils import get_redis_queue -from pyisda.date import previous_twentieth -from typing import Tuple, Union -from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date -from tabulate import tabulate -from headers import get_headers -from trade_dataclasses import DealKind - -_client_name = {"SERCGMAST": "Serenitas", "BOWDST": "HEDGEMARK", "BRINKER": "LMCG"} - - -def get_effective_date(d, swaption_type): - if swaption_type == "CD_INDEX_OPTION": - return previous_twentieth(d + datetime.timedelta(days=1)) - else: - cal = UnitedStates() - return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) - - -def groupby(p: redis.client.Pipeline, key: str, trade_key: str): - d = defaultdict(list) - for buf in p.lrange(key, 0, -1): - trade = loads(buf) - d[trade[trade_key]].append(trade) - return d - - -def get_trades(p: redis.client.Pipeline, key: str): - for tradeid, trades in groupby(p, key, "id").items(): - if len(trades) == 1: - yield trades[0] - else: - if trades[-1]["action"] == "CANCEL": - continue - if trades[0]["action"] == "NEW": - trades[-1]["action"] = "NEW" - yield trades[-1] - if trades[-1]["action"] == "UPDATE": - yield trades[-1] - - -def process_indicative( - p: redis.client.Pipeline, - trade_type: str, - upload: bool, - session: blpapi.session.Session, - conn: psycopg.Connection, -) -> None: - process_fun = globals().get( - f"{trade_type}_trade_process", lambda conn, session, trade: trade - ) - for trade in get_trades(p, trade_type): - process_fun(conn, session, trade) - fund = trade["fund"] - if trade.get("upload", True) and ( - fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") - ): - p.rpush(f"{trade_type}_upload", dumps(trade)) - if trade.get("swap_type", None) in ( - "CD_INDEX_OPTION", - "CD_INDEX_TRANCHE", - "BESPOKE", - ): - DealKind[trade_type].from_dict(**trade).mtm_stage() - if Deal := DealKind[trade_type]: - Deal.mtm_upload() - p.delete(trade_type) - - -def process_upload( - p: redis.client.Pipeline, - trade_type: str, - upload: bool, -) -> None: - key = f"{trade_type}_upload" - for fund, l in groupby(p, key, "fund").items(): - buf = StringIO() - csvwriter = csv.writer(buf) - csvwriter.writerow(get_headers(trade_type, fund)) - csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l) - buf = buf.getvalue().encode() - dest = get_filepath(DAILY_DIR, trade_type, fund) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) - if upload: - upload_buf(buf, dest.name, fund, trade_type) - - p.delete(key) - - -def terminate_list( - p: redis.client.Pipeline, - key: str, - upload: bool, - conn: psycopg.connection, - base_dir: pathlib.Path = DAILY_DIR, -): - trade_type, fund, _ = key.split("_") - terms = [] - for term in p.lrange(key, 0, -1): - termination = loads(term) - DealKind["termination"].from_dict(**termination).mtm_stage() - try: - terms.append(trade.to_globeop()) - except TypeError as e: - logging.error(e) - return - DealKind["termination"].mtm_upload() - if upload and terms: - dest = get_filepath(base_dir, (trade_type, "A"), fund) - buf = StringIO() - csvwriter = csv.writer(buf) - headers = get_headers("termination", fund) - csvwriter.writerow(headers) - csvwriter.writerows([trade.get(h) for h in headers] for trade in terms) - buf = buf.getvalue().encode() - upload_buf(buf, dest.name, fund, trade_type) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) - p.delete(key) - - -def rename_keys(d, mapping): - """rename keys in dictionary according to mapping dict inplace""" - for k, v in mapping.items(): - if k in d: - d[v] = d.pop(k) - - -def build_line(obj, trade_type="bond", fund="SERCGMAST"): - obj["Client"] = _client_name[fund] - # Bowdst Globeop has a special short name - obj["fund"] = "BOS_PAT_BOWDOIN" if fund == "BOWDST" else fund - obj["State"] = "Valid" - rename_cols = { - "fund": "Fund", - "action": "Action", - "dealid": "Deal Id", - "folder": "Folder", - "custodian": "Custodian", - "cash_account": "Cash Account", - "cp_code": "Counterparty", - "identifier": "GlopeOp Security Identifier", - "cusip": "CUSIP", - "isin": "ISIN", - "description": "Security Description", - "accrued": "Accrued", - "price": "Price", - "faceamount": "FaceAmount", - "trade_date": "Trade Date", - "settle_date": "Settlement Date", - "effective_date": "EffectiveDate", - "maturity": "MaturityDate", - "currency": "Currency", - "fixed_rate": "FixedRate", - "payment_rolldate": "PaymentRollDateConvention", - "day_count": "DayCount", - "protection": "Protection", - "security_id": "UnderlyingSecurityId", - "security_desc": "UnderlyingSecurityDescription", - "upfront": "UpfrontFee", - "upfront_settle_date": "UpfrontFeePayDate", - "swap_type": "SwapType", - "orig_attach": "AttachmentPoint", - "orig_detach": "ExhaustionPoint", - "clearing_facility": "Clearing Facility", - "isda_definition": "ISDADefinition", - "expiration_date": "ExpirationDate", - "portfolio": "Portfolio", - "settlement_type": "SettlementMode", - "principal_payment": "PrincipalPayment", - "accrued_payment": "AccruedPayment", - "current_face": "CurrentFace", - } - rename_cols[ - "curr_notional" if fund in ("SERCGMAST", "BOWDST") else "notional" - ] = "Notional" - rename_keys(obj, rename_cols) - if trade_type in ("bond", "swaption", "future"): - obj["Transaction Indicator"] = "Buy" if obj["buysell"] else "Sell" - if trade_type == "bond": - obj["Deal Type"] = "MortgageDeal" - obj["Portfolio"] = "MORTGAGES" - obj["Delivery"] = "S" - # zero coupon bond - if obj["CUSIP"] != obj["GlopeOp Security Identifier"]: - obj["CUSIP"] = None - elif trade_type == "swaption": - obj["Deal Type"] = "SwaptionDeal" - obj["ExerciseType"] = "European" - rename_keys( - obj, - { - "Settlement Date": "PremiumSettlementDate", - "notional": "Notional", - "initial_margin_percentage": "InitialMarginPercentage", - }, - ) - obj["PremiumSettlementAmount"] = ( - obj["Price"] * obj["Notional"] * obj.get("factor", 1.0) * 0.01 - ) - obj["PremiumSettlementCurrency"] = obj["Currency"] - obj["RegenerateCashFlow"] = "N" - for direction in ["Pay", "Receive"]: - obj[direction + "MaturityDate"] = obj["MaturityDate"] - obj[direction + "Currency"] = obj["Currency"] - obj[direction + "Notional"] = obj["Notional"] - obj[direction + "EffectiveDate"] = get_effective_date( - obj["ExpirationDate"], obj["SwapType"] - ) - if obj["SwapType"] == "CD_INDEX_OPTION": - for direction in ["Pay", "Receive"]: - obj[direction + "Daycount"] = "ACT/360" - obj[direction + "Frequency"] = "Quarterly" - obj[direction + "PaymentRollConvention"] = "Following" - - for leg_type in ["Receive", "Pay"]: - obj[leg_type + "LegRateType"] = "Fixed" - if obj["option_type"] == "PAYER": - obj["ReceiveFixedRate"] = 0.0 - obj["PayFixedRate"] = obj["FixedRate"] - elif obj["option_type"] == "RECEIVER": - obj["PayFixedRate"] = 0.0 - obj["ReceiveFixedRate"] = obj["FixedRate"] - elif obj["SwapType"] == "SWAPTION": - for direction in ["Pay", "Receive"]: - obj[direction + "PaymentRollConvention"] = "ModifiedFollowing" - if obj["option_type"] == "RECEIVER": - fixed, floating = "Receive", "Pay" - else: - fixed, floating = "Pay", "Receive" - # fixed leg - obj[fixed + "Frequency"] = "Yearly" - obj[fixed + "Daycount"] = "ACT/360" - obj[fixed + "FixedRate"] = obj["strike"] - obj[fixed + "LegRateType"] = "Fixed" - obj[fixed + "InterestCalcMethod"] = "Simple Interest" - # floating leg - obj[floating + "Frequency"] = "Yearly" - obj[floating + "Daycount"] = "ACT/360" - obj[floating + "LegRateType"] = "Float" - obj[floating + "FloatRate"] = "SOFRINDX" - obj[floating + "InterestCalcMethod"] = "Simple Interest" - - else: - raise ValueError( - "'SwapType' needs to be one of 'CD_INDEX_OPTION' or 'SWAPTION'" - ) - - obj["PremiumCurrency"] = obj["Currency"] - if obj["InitialMarginPercentage"]: - obj["InitialMarginCurrency"] = obj["Currency"] - obj["UnderlyingInstrument"] = obj.pop("UnderlyingSecurityId") - if obj["SwapType"] == "CD_INDEX_OPTION": - obj["Strike"] = obj.pop("strike") - - elif trade_type == "cds": - freq = {4: "Quarterly", 12: "Monthly"} - obj["Deal Type"] = "CreditDefaultSwapDeal" - obj["PaymentFrequency"] = freq[obj["frequency"]] - obj["InitialMarginPercentage"] = obj.pop("initial_margin_percentage") - if obj["InitialMarginPercentage"]: - obj["InitialMarginCurrency"] = obj["Currency"] - if obj["Clearing Facility"] is None: - obj["Clearing Facility"] = "NOT CLEARED" - - elif trade_type == "future": - obj["Deal Type"] = "FutureDeal" - rename_keys( - obj, - { - "commission": "Commission", - "quantity": "Quantity", - "swap_type": "Swap Type", - "bbg_ticker": "Bloomberg Ticker", - "Currency": "Trade Currency", - "exchange": "Exchange", - }, - ) - elif trade_type == "wire": - obj["Deal Type"] = "CashFlowDeal" - obj["Transaction Type"] = "Transfer" - obj["Instrument Type"] = "Cashflow" - obj["Settlement Date"] = obj["Trade Date"] - strat_portfolio_map = { - "IGOPTDEL": "OPTIONS", - "COCSH": "OPTIONS", - "IGINX": "TRANCHE", - "BSPK": "TRANCHE", - "TCSH": "TRANCHE", - "SER_ITRXCURVE": "SERG__CURVE", - "XCURVE": "SERG__CURVE", - "M_CSH_CASH": "CASH", - "CVECSH": "SERG__CURVE", - "SER_ITRXCVCSH": "SERG__CURVE", - } - obj["Portfolio"] = strat_portfolio_map.get(obj["Folder"]) - rename_keys(obj, {"amount": "Amount"}) - - elif trade_type == "spot": - standard_settle = (obj["Trade Date"] + 2 * bus_day).date() - if obj["Settlement Date"] > standard_settle: - obj["Deal Type"] = "ForwardDeal" - fx_rate = "Forward Rate" - else: - obj["Deal Type"] = "SpotDeal" - fx_rate = "Spot Rate" - rename_keys( - obj, - { - "commission": "Commission", - "commission_currency": "Commission Currency", - "sell_currency": "Sell Currency", - "sell_amount": "Sell Amount", - "buy_currency": "Buy Currency", - "buy_amount": "Buy Amount", - "spot_rate": fx_rate, - }, - ) - elif trade_type == "fx_swap": - obj["Deal Type"] = "FxSwapDeal" - obj["Action"] = "NEW" - rename_keys( - obj, - { - "near_rate": "Near Side Currency Rate", - "near_settle_date": "Near Side Settlement Date", - "near_buy_currency": "Near Side Buy Currency", - "near_buy_amount": "Near Side Buy Amount", - "near_sell_currency": "Near Side Sell Currency", - "near_sell_amount": "Near Side Sell Amount", - "far_rate": "Far Side Rate", - "far_settle_date": "Far Side Settlement Date", - "far_buy_currency": "Far Side Buy Currency", - "far_buy_amount": "Far Side Buy Amount", - "far_sell_currency": "Far Side Sell Currency", - "far_sell_amount": "Far Side Sell Amount", - }, - ) - elif trade_type == "repo": - obj["Deal Type"] = "RepoDeal" - obj["OpenRepo"] = "Y" if obj["open_repo"] else "N" - rename_keys( - obj, - { - "weighted_amount": "WeightedAmount", - "repo_rate": "RepoRate", - "transaction_indicator": "TransactionIndicator", - }, - ) - elif trade_type == "capfloor": - obj["Deal Type"] = "CapFloorDeal" - obj["PaymentBDC"] = obj["bdc_convention"] - obj["AccrualBDC"] = obj["bdc_convention"] - obj["MaturityBDC"] = "NONE" - obj["TransactionIndicator"] = "Buy" if obj["buysell"] else "Sell" - # missing data : 'Adjusted', 'RollConvention', 'Calendar', 'Arrears', 'Collateralized', 'MaturityBDC' - rename_keys( - obj, - { - "comments": "Comments", - "floating_rate_index_desc": "FloatingRateIndex", - "cap_or_floor": "CapOrFloor", - "amount": "Notional", - "strike": "Strike", - "value_date": "ValueDate", - "expiration_date": "MaturityDate", - "premium_percent": "PremiumPercent", - "pricing_type": "PricingType", - "payment_frequency": "PaymentFrequency", - "fixing_frequency": "FixingFrequency", - "day_count_convention": "Basis", - "bdc_convention": "PaymentBDC", - "payment_mode": "PaymentMode", - "payment_at_beginning_or_end": "PaymentAtBeginningOrEnd", - "initial_margin_percentage": "InitialMarginPercentage", - "intial_margin_currency": "InitialMarginCurrency", - "reset_lag": "ResetLag", - "swap_type": "SwapType", - }, - ) - - return [obj.get(h, None) for h in get_headers(trade_type, fund)] - - -def get_bbg_data( - conn, - session, - identifier, - cusip=None, - isin=None, - settle_date=None, - asset_class=None, - **kwargs, -): - fields = ["MTG_FACTOR_SET_DT", "INT_ACC", "ISSUER"] - fields_dict = { - "Mtge": ["MTG_FACE_AMT", "START_ACC_DT"], - "Corp": ["AMT_ISSUED", "PREV_CPN_DT"], - } - with conn.cursor() as c: - c.execute( - "SELECT identifier FROM securities WHERE identifier=%s", (identifier,) - ) - if not c.fetchone(): - fields += [ - "MATURITY", - "CRNCY", - "NAME", - "FLOATER", - "FLT_SPREAD", - "CPN", - "CPN_TYP", - "CPN_FREQ", - "FIRST_CPN_DT", - "MTG_PAY_DELAY", - "DAY_CNT_DES", - "NOMINAL_PAYMENT_DAY", - "ISSUE_DT", - "RESET_IDX", - "ID_BB_GLOBAL", - ] - - cusip_or_isin = cusip or isin - for bbg_type in ["Mtge", "Corp"]: - bbg_id = cusip_or_isin + " " + bbg_type - data = retrieve_data( - session, - [bbg_id], - fields + fields_dict[bbg_type], - overrides={"SETTLE_DT": settle_date} if settle_date else None, - ) - if data[bbg_id]: - break - else: - logging.error(f"{cusip_or_isin} not in bloomberg") - return - - bbg_data = data[bbg_id] - if bbg_data.get("MTG_FACTOR_SET_DT", 0) == 0: - bbg_data["MTG_FACTOR_SET_DT"] = 1 - bbg_data["INT_ACC"] = 0 - if len(fields) > 3: # we don't have the data in the securities table - sql_fields = [ - "identifier", - "cusip", - "isin", - "description", - "face_amount", - "maturity", - "floater", - "spread", - "coupon", - "frequency", - "day_count", - "first_coupon_date", - "pay_delay", - "currency", - "bbg_type", - "asset_class", - "start_accrued_date", - "issuer", - "reset_index", - "coupon_type", - "payment_day", - "issue_date", - "figi", - ] - placeholders = ",".join(["%s"] * len(sql_fields)) - columns = ",".join(sql_fields) - - sqlstr = ( - f"INSERT INTO securities({columns}) VALUES({placeholders}) " - "ON CONFLICT (identifier) DO NOTHING" - ) - isfloater = bbg_data["FLOATER"] == "Y" - pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) - day_count = bbg_data.get("DAY_CNT_DES") - if m := re.match(r"[^(\s]+", day_count): - day_count = m.group(0) - if isinstance(pay_delay, str): - pay_delay = int(pay_delay.split(" ")[0]) - with conn.cursor() as c: - c.execute( - sqlstr, - ( - identifier, - cusip, - isin, - bbg_data["NAME"], - bbg_data.get("MTG_FACE_AMT") or bbg_data.get("AMT_ISSUED"), - bbg_data.get("MATURITY"), - isfloater, - bbg_data.get("FLT_SPREAD") if isfloater else None, - bbg_data.get("CPN") if not isfloater else None, - bbg_data.get("CPN_FREQ"), - day_count, - bbg_data.get("FIRST_CPN_DT"), - pay_delay, - bbg_data.get("CRNCY"), - bbg_type, - asset_class, - bbg_data.get("START_ACC_DT") or bbg_data.get("PREV_CPN_DT"), - bbg_data["ISSUER"], - bbg_data.get("RESET_IDX"), - bbg_data["CPN_TYP"], - bbg_data["NOMINAL_PAYMENT_DAY"], - bbg_data["ISSUE_DT"], - bbg_data["ID_BB_GLOBAL"], - ), - ) - conn.commit() - return bbg_data - - -def bond_trade_process(conn, session, trade): - bbg_data = get_bbg_data(conn, session, **trade) - currentface = trade["CurrentFace"] = ( - trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] - ) - accrued_payment = trade["AccruedPayment"] = ( - bbg_data["INT_ACC"] * currentface / 100.0 - ) - principal_payment = trade["PrincipalPayment"] = currentface * trade["price"] / 100.0 - if trade["accrued"] is None: - trade["accrued"] = bbg_data["INT_ACC"] - else: - if trade["accrued"] != bbg_data["INT_ACC"]: - logging.error( - f"{trade['accrued']} does not match bbg amount of {bbg_data['INT_ACC']}" - ) - - with conn.cursor() as c: - c.execute( - "UPDATE bonds SET principal_payment = %s, accrued_payment = %s, accrued=%s " - "WHERE id = %s", - (principal_payment, accrued_payment, trade["accrued"], int(trade["id"])), - ) - # mark it at buy price - if trade["buysell"]: - sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" - with conn.cursor() as c: - c.execute( - sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) - ) - conn.commit() - return trade - - -def send_email(trade): - # send out email with trade content - if trade["upload"]: - email = ExchangeMessage() - email.send_email(email_subject(trade), print_trade(trade), ("nyops@lmcg.com",)) - return trade - - -def is_tranche_trade(trade): - return trade["swap_type"] in ("CD_INDEX_TRANCHE", "BESPOKE") - - -def swaption_trade_process(conn, session, trade): - sqlstr = ( - "SELECT indexfactor/100 FROM index_version " - "WHERE redindexcode=%(security_id)s" - ) - try: - with conn.cursor() as c: - c.execute(sqlstr, trade) - (factor,) = c.fetchone() - except ValueError as e: - logging.error(e) - return trade - except TypeError: - # factor missing, probably IR swaption - pass - else: - trade["factor"] = factor - finally: - if trade["option_type"] == "RECEIVER": - trade["OptionType"] = "Call" - elif trade["option_type"] == "PAYER": - trade["OptionType"] = "Put" - return trade - - -def cds_trade_process(conn, session, trade): - sqlstr = ( - "SELECT indexfactor/100 FROM index_version " - "WHERE redindexcode=%(security_id)s" - ) - try: - with conn.cursor() as c: - c.execute(sqlstr, trade) - (factor,) = c.fetchone() - except ValueError: - bbg_data = get_bbg_data( - conn, - session, - trade["security_id"], - isin=trade["security_id"], - asset_class="Subprime", - ) - - factor = bbg_data["MTG_FACTOR_SET_DT"] - if is_tranche_trade(trade): - tranche_factor = (trade["attach"] - trade["detach"]) / ( - trade["orig_attach"] - trade["orig_detach"] - ) - trade["curr_notional"] = trade["notional"] * tranche_factor - trade["Factor"] = tranche_factor - else: - trade["curr_notional"] = trade["notional"] * factor - trade["Factor"] = factor - if trade["upfront"]: - return trade - index = CreditIndex( - redcode=trade["security_id"], - maturity=trade["maturity"], - notional=trade["notional"], - value_date=trade["trade_date"], - ) - index.direction = trade["protection"] - with conn.cursor() as c: - if trade["traded_level"]: - if not is_tranche_trade(trade): - index.ref = float(trade["traded_level"]) - trade["upfront"] = -index.pv - else: - accrued = index._accrued * trade["fixed_rate"] - match index.index_type: - case "HY": - dirty_price = float(trade["traded_level"]) + accrued - trade["upfront"] = ( - -(100 - dirty_price) - * index.notional - * trade["Factor"] - * 0.01 - ) - case "EU" | "XO" if trade["orig_attach"] in (6, 12, 35): - if trade["orig_attach"] == 6: - index.recovery = 0.0 - index.spread = float(trade["traded_level"]) - trade["upfront"] = ( - -index._pv * trade["notional"] * trade["Factor"] - ) - case _: - dirty_protection = float(trade["traded_level"]) - accrued - trade["upfront"] = ( - -dirty_protection * index.notional * trade["Factor"] * 0.01 - ) - c.execute( - "UPDATE cds SET upfront=%s WHERE dealid=%s", - (trade["upfront"], trade["dealid"]), - ) - - else: - index.pv = -trade["upfront"] - trade["traded_level"] = index.ref - c.execute( - "UPDATE cds SET traded_level=%s WHERE dealid=%s", - (trade["traded_level"], trade["dealid"]), - ) - conn.commit() - return trade - - -def get_filepath( - base_dir: pathlib.Path, - trade_type: Union[str, Tuple[str, str]], - fund: str = "SERCGMAST", -) -> pathlib.Path: - d = { - "bond": "Mortgages", - "cds": "CreditDefaultSwapDeal", - "swaption": "SwaptionDeal", - "future": "Future", - "wire": "CashFlowDeal", - "spot": "SpotDeal", - "fx_swap": "FxSwapDeal", - "capfloor": "CapFloor", - "repo": "RepoDeal", - } - trade_tag: str - if isinstance(trade_type, tuple): - trade_tag = d[trade_type[0]] + trade_type[1] - else: - trade_tag = d[trade_type] - - timestamp = datetime.datetime.now() - if fund == "BRINKER": - return ( - base_dir - / str(timestamp.date()) - / f"LMCG_BBH_SWAP_TRADES_P.{timestamp:%Y%m%d%H%M%S}.csv" - ) - elif fund == "SERCGMAST": - return ( - base_dir - / str(timestamp.date()) - / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" - ) - elif fund == "BOWDST": - return ( - base_dir - / str(timestamp.date()) - / f"Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" - ) - - -def upload_buf( - buf: bytes, dest: str, fund: str = "SERCGMAST", trade_type="bond" -) -> None: - if fund == "BRINKER": - sftp = SftpClient.from_creds("bbh") - sftp.put(buf, dest) - elif fund == "SERCGMAST": - ftp = FtpClient.from_creds("globeop") - ftp.client.cwd("incoming") - ftp.put(buf, dest) - elif fund == "BOWDST": - sftp = SftpClient.from_creds("hm_globeop") - sftp.client.chdir("incoming") - sftp.put(buf, dest) - em = ExchangeMessage() - recipients = ("hm-operations@bnymellon.com",) - em.send_email( - "Trade file", - "", - to_recipients=recipients, - cc_recipients=("bowdoin-ops@lmcg.com",), - attach=(FileAttachment(name=dest, content=buf),), - ) - else: - raise ValueError(f"unknow fund name: {fund}") - - -def email_subject(trade): - return "[{0}] {1} {2} {3}".format( - trade["asset_class"], - trade["action"], - "Buy" if trade["buysell"] else "Sell", - trade["description"], - ) - - -def print_trade(trade): - d = trade.copy() - d["buysell"] = "Buy" if d["buysell"] else "Sell" - return tabulate((k, v) for k, v in d.items()) - - -if __name__ == "__main__": - import os - - os.environ["SERENITAS_APP_NAME"] = "process_queue" - from functools import partial - from serenitas.utils.pool import dawn_pool - - parser = argparse.ArgumentParser() - parser.add_argument( - "-n", "--no-upload", action="store_true", help="do not upload to Globeop" - ) - args = parser.parse_args() - r = get_redis_queue() - with dawn_pool.connection() as conn, init_bbg_session() as session: - for trade_type in [ - "cds", - "swaption", - "repo", - "future", - "wire", - "spot", - "fx_swap", - "capfloor", - ]: - p_list = partial( - process_indicative, - trade_type=trade_type, - upload=not args.no_upload, - session=session, - conn=conn, - ) - r.transaction(p_list, trade_type) - p_upload = partial( - process_upload, - trade_type=trade_type, - upload=not args.no_upload, - ) - r.transaction(p_upload, trade_type) - - for trade_type in ("cds", "swaption", "capfloor"): - for fund in ("SERCGMAST", "BOWDST", "BRINKER"): - key = f"{trade_type}_{fund}_termination" - t_list = partial( - terminate_list, key=key, upload=not args.no_upload, conn=conn - ) - r.transaction(t_list, key) |
