diff options
| -rw-r--r-- | python/Dawn/views.py | 27 | ||||
| -rwxr-xr-x | python/build_termination.py | 213 | ||||
| -rw-r--r-- | python/process_queue.py | 245 | ||||
| l--------- | python/terminate_swaption | 1 | ||||
| l--------- | python/terminate_tranche | 1 |
5 files changed, 201 insertions, 286 deletions
diff --git a/python/Dawn/views.py b/python/Dawn/views.py index 897cef58..abd9ce83 100644 --- a/python/Dawn/views.py +++ b/python/Dawn/views.py @@ -385,23 +385,22 @@ def terminate(dealid, kind): termination = Termination() form = TerminationForm(dealid=dealid) form.termination_cp.choices = form.termination_cp.choices + list(cp_choices(kind)) + table = kind if kind.endswith("s") else kind + "s" if form.validate_on_submit(): form.populate_obj(termination) session = form.get_session() - if not termination.partial_termination or termination.termination_cp is None: - rec = db.session.execute( - "SELECT cp_code, notional, coalesce(terminated_amount, 0.) " - "FROM swaptions " - "LEFT JOIN " - "(SELECT dealid, sum(termination_amount) AS terminated_amount " - " FROM terminations group by dealid) term USING (dealid) " - "WHERE dealid = :dealid", - {"dealid": dealid}, - ) - cp_code, notional, terminated_amount = next(rec) + rec = db.session.execute( + "SELECT notional, coalesce(terminated_amount, 0.), currency, globeop_id " + f"FROM {table} " + "LEFT JOIN " + "(SELECT dealid, sum(termination_amount) AS terminated_amount " + " FROM terminations group by dealid) term USING (dealid) " + "WHERE dealid = :dealid", + {"dealid": dealid}, + ) + notional, terminated_amount, currency, globeop_id = next(rec) + if not termination.partial_termination: termination.termination_amount = notional - terminated_amount - if termination.termination_cp is None: - termination.termination_cp = cp_code session.add(termination) try: session.commit() @@ -409,7 +408,7 @@ def terminate(dealid, kind): app.logger.error(e) session.rollback() else: - buf = simple_serialize(termination) + buf = simple_serialize(termination, ccy=currency, globeopid=globeop_id) q = get_queue() q.rpush(f"{kind}_termination", buf) return redirect(url_for("list_trades", kind=kind)) diff --git a/python/build_termination.py b/python/build_termination.py deleted file mode 100755 index ba36ac41..00000000 --- a/python/build_termination.py +++ /dev/null @@ -1,213 +0,0 @@ -#! /usr/bin/python -import argparse -import csv -import datetime -from process_queue import build_termination, upload_file -from env import DAILY_DIR -from utils.db import dbconn -from io import StringIO - - -def build_termination( - base_dir, - dawndb, - dealid, - fee, - *, - termination_date=datetime.date.today(), - termination_amount=None, - termination_cp=None, - deal_type="CreditDefaultSwapDeal", -): - """ if termination_amount is None, assume full termination - if termination_cp is None assume termination, otherwise assignment - """ - if deal_type == "CreditDefaultSwapDeal": - table = "cds" - elif deal_type == "SwaptionDeal": - table = "swaptions" - else: - raise ValueError("Unkown deal_type: {deal_type}") - - with dawndb.cursor() as c: - c.execute( - "SELECT dealid, cp_code, notional, termination_amount, " - "globeop_id, currency " - f"FROM {table} where id=%s", - (dealid,), - ) - dealid, cp_code, notional, partial_amounts, globeopid, ccy = c.fetchone() - if partial_amounts is not None: - remaining_notional = notional - sum(partial_amounts) - else: - remaining_notional = notional - termination_amount = termination_amount or remaining_notional - if deal_type == "CreditDefaultSwapDeal": - c.execute( - f"UPDATE {table} " - "SET termination_amount=termination_amount||%s::float8, " - "termination_cp=termination_cp||%s::text, " - "termination_fee=termination_fee||%s::float8, " - "termination_date=termination_date||%s::date " - "WHERE dealid=%s", - ( - termination_amount, - termination_cp or cp_code, - fee, - termination_date, - dealid, - ), - ) - else: - c.execute( - f"UPDATE {table} " - "SET termination_amount=%s::float8, " - "termination_cp=%s::text, " - "termination_fee=%s::float8, " - "termination_date=%s::date " - "WHERE dealid=%s", - ( - termination_amount, - termination_cp or cp_code, - fee, - termination_date, - dealid, - ), - ) - dawndb.commit() - headers = [ - "DealType", - "DealId", - "Action", - "Client", - "SubAction", - "PartialTermination", - "TerminationAmount", - "TerminationDate", - "FeesPaid", - "FeesReceived", - "DealFunction", - "Reserved", - "ClientReference", - ] - if deal_type == "CreditDefaultSwapDeal": - headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"] - else: - headers += ["Reserved"] * 3 - headers += ["FeePaymentDate", "SpecialInstructions"] - - if termination_cp is not None: - headers += ["AssignedCounterparty"] - else: - headers += ["Reserved"] - if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None: - headers += [ - "AssignmentFee", - "AssignedFeeTradeDate", - "AssignedFeeValueDate", - "AssignedCustodian", - "AssignedCashAccount", - "Reserved", - "FeeCurrency", - ] - else: - headers += ["Reserved"] * 7 - headers += ["GoTradeId"] - if deal_type == "CreditDefaultSwapDeal": - headers += ["FeeComments", "ZeroOutInterestCashFlows"] - else: - headers += ["Reserved"] * 2 - headers += ["Reserved"] * 4 - if deal_type == "SwaptionDeal": - headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"] - elif deal_type == "CreditDefaultSwapDeal": - if termination_cp is None: - headers += ["Reserved"] * 3 - else: - headers += ["AssignedDealFunction"] + ["Reserved"] * 2 - headers += ["InitialMargin", "InitialMarginCurrency"] - if termination_cp is None: - headers += ["Reserved"] * 4 + ["CreditEventOccured"] - d = { - "DealType": deal_type, - "GoTradeId": int(globeopid[3:9]), - "Action": "Update", - "Client": "Serenitas", - "SubAction": "Termination", - "PartialTermination": "Y" - if remaining_notional - termination_amount > 0 - else "N", - "TerminationAmount": termination_amount, - "TerminationDate": termination_date, - "FeesPaid": -fee if fee < 0 else None, - "FeesReceived": fee if fee > 0 else None, - "FeePaymentDate": (termination_date + 3 * bus_day).date(), - } - if "FeeCurrency" in headers: - d["FeeCurrency"] = ccy - if termination_cp is not None: - d["AssignedCounterparty"] = termination_cp - buf = StringIO() - csvwriter = csv.DictWriter(buf, headers) - csvwriter.writeheader() - csvwriter.writerow(d) - timestamp = datetime.datetime.now() - trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T" - file_path = ( - base_dir - / str(timestamp.date()) - / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv" - ) - file_path.write_bytes(buf.getvalue().encode()) - return file_path - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="helper script to terminate or assign a trade" - ) - parser.add_argument("dealid", help="Serenitas id", type=int) - parser.add_argument( - "fee", help="termination fee ( >0 we receive money)", type=float - ) - parser.add_argument( - "--date", - "-d", - help="termination date, default to today's date", - type=datetime.date.fromisoformat, - default=datetime.date.today(), - ) - parser.add_argument( - "--amount", - "-a", - help="termination amount, if missing full termination", - default=None, - required=False, - ) - parser.add_argument( - "--counterparty", - "-c", - help="termination counterparty, if missing, original counterparty is used (termination)", - default=None, - required=False, - ) - args = parser.parse_args() - dawndb = dbconn("dawndb") - if "tranche" in __file__: - deal_type = "CreditDefaultSwapDeal" - elif "swaption" in __file__: - deal_type = "SwaptionDeal" - else: - raise RuntimeError("Incorrect deal type") - - p = build_termination( - DAILY_DIR, - dawndb, - args.dealid, - args.fee, - termination_date=args.date, - termination_cp=args.counterparty, - termination_amount=args.amount, - deal_type=deal_type, - ) - upload_file(p) diff --git a/python/process_queue.py b/python/process_queue.py index 25480b47..51e4302b 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -4,6 +4,7 @@ import datetime import logging import pathlib import re +import redis import sys import task_server.config as config @@ -16,6 +17,7 @@ from pickle import loads from ftplib import FTP from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue +from functools import partial from pyisda.date import previous_twentieth from utils.db import dbconn from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date @@ -295,26 +297,59 @@ def get_effective_date(d, swaption_type): return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) -def get_trades(q, trade_type="bond", fund="SERCGMAST"): - queue_name = f"{trade_type}_{fund}" - r = q.lrange(queue_name, 0, -1) - df = [loads(e) for e in r] - list_trades = [] +def get_trades(p: redis.client.Pipeline, key: str): + df = [loads(e) for e in p.lrange(key, 0, -1)] if df: for tradeid, v in groupby(df, lambda x: x["id"]): trades = list(v) trades = sorted(trades, key=lambda x: x["lastupdate"]) if len(trades) == 1: - list_trades.append(trades[0]) + yield trades[0] else: if trades[-1]["action"] == "CANCEL": continue if trades[0]["action"] == "NEW": trades[-1]["action"] = "NEW" - list_trades.append(trades[-1]) + yield trades[-1] if trades[-1]["action"] == "UPDATE": - list_trades.append(trades[-1]) - return list_trades + yield trades[-1] + + +def process_list(p: redis.client.Pipeline, key: str, upload: bool) -> None: + trade_type, fund = key.split("_") + trades = get_trades(p, key) + if trade_type in ["bond", "cds", "swaption"]: + process_fun = globals()[f"{trade_type}_trade_process"] + with init_bbg_session(BBG_IP) as session: + trades = (process_fun(dawndb, session, trade) for trade in trades) + if trade_type == "bond" and fund == "SERCGMAST": + trades = (send_email(trade) for trade in trades) + if fund == "SERCGMAST" or trade_type in ("cds", "swaption"): + try: + buf = generate_csv( + (t for t in trades if t.get("upload", True)), trade_type, fund, + ) + file_path = write_buffer(buf, DAILY_DIR, trade_type, fund) + if upload: + upload_file(file_path) + except IOError: + pass + p.delete(key) + + +def terminate_list( + p: redis.client.Pipeline, key: str, upload: bool, base_dir: pathlib.Path +) -> None: + trade_type = key.split("_")[0] + for term in p.lrange(key, 0, -1): + termination = loads(term) + try: + f = build_termination(base_dir, **termination) + except TypeError as e: + logging.error(e) + if upload: + upload_file(f) + p.delete(key) def rename_keys(d, mapping): @@ -632,30 +667,31 @@ def bond_trade_process(conn, session, trade): currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0 principal_payment = currentface * trade["price"] / 100.0 - with conn: - with conn.cursor() as c: - c.execute( - "UPDATE bonds SET principal_payment = %s, accrued_payment = %s " - "WHERE id = %s", - (principal_payment, accrued_payment, int(trade["id"])), - ) + with conn.cursor() as c: + c.execute( + "UPDATE bonds SET principal_payment = %s, accrued_payment = %s " + "WHERE id = %s", + (principal_payment, accrued_payment, int(trade["id"])), + ) # mark it at buy price if trade["buysell"]: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" - with conn: - with conn.cursor() as c: - c.execute( - sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) - ) + with conn.cursor() as c: + c.execute( + sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) + ) + return trade def send_email(trade): # send out email with trade content - email = GmailMessage() - email.set_content(print_trade(trade)) - email["To"] = "nyops@lmcg.com" - email["Subject"] = email_subject(trade) - email.send() + if trade["upload"]: + email = GmailMessage() + email.set_content(print_trade(trade)) + email["To"] = "nyops@lmcg.com" + email["Subject"] = email_subject(trade) + email.send() + return trade def is_tranche_trade(trade): @@ -671,13 +707,14 @@ def swaption_trade_process(conn, session, trade): with conn.cursor() as c: c.execute(sqlstr, trade) try: - factor, = c.fetchone() + (factor,) = c.fetchone() except TypeError: - return + return trade except ValueError: - return + return trade else: trade["factor"] = factor + return trade def cds_trade_process(conn, session, trade): @@ -688,7 +725,7 @@ def cds_trade_process(conn, session, trade): try: with conn.cursor() as c: c.execute(sqlstr, trade) - factor, = c.fetchone() + (factor,) = c.fetchone() except ValueError: bbg_data = get_bbg_data( conn, @@ -736,10 +773,11 @@ def cds_trade_process(conn, session, trade): def generate_csv(l, trade_type="bond", fund="SERCGMAST"): output = StringIO() csvwriter = csv.writer(output) - csvwriter.writerow(HEADERS[trade_type]) empty = True for trade in l: - empty = False + if empty: + csvwriter.writerow(HEADERS[trade_type]) + empty = False csvwriter.writerow(build_line(trade.copy(), trade_type)) if empty: raise IOError("empty trade queue") @@ -811,6 +849,114 @@ def print_trade(trade): return tabulate((k, v) for k, v in d.items()) +def build_termination( + base_dir: pathlib.Path, + *, + dealid, + termination_fee, + termination_date=datetime.date.today(), + termination_amount=None, + termination_cp=None, + globeopid=None, + partial_termination: bool = False, + ccy: str = "USD", + **kwargs, +): + """ if termination_amount is None, assume full termination + if termination_cp is None assume termination, otherwise assignment + """ + if dealid.startswith("SWPTN"): + deal_type = "SwaptionDeal" + elif dealid.startswith("SCCDS"): + deal_type = "CreditDefaultSwapDeal" + else: + deal_type = "CapFloorDeal" + + headers = [ + "DealType", + "DealId", + "Action", + "Client", + "SubAction", + "PartialTermination", + "TerminationAmount", + "TerminationDate", + "FeesPaid", + "FeesReceived", + "DealFunction", + "Reserved", + "ClientReference", + ] + if deal_type == "CreditDefaultSwapDeal": + headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"] + else: + headers += ["Reserved"] * 3 + headers += ["FeePaymentDate", "SpecialInstructions"] + + if termination_cp is not None: + headers += ["AssignedCounterparty"] + else: + headers += ["Reserved"] + if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None: + headers += [ + "AssignmentFee", + "AssignedFeeTradeDate", + "AssignedFeeValueDate", + "AssignedCustodian", + "AssignedCashAccount", + "Reserved", + "FeeCurrency", + ] + else: + headers += ["Reserved"] * 7 + headers += ["GoTradeId"] + if deal_type == "CreditDefaultSwapDeal": + headers += ["FeeComments", "ZeroOutInterestCashFlows"] + else: + headers += ["Reserved"] * 2 + headers += ["Reserved"] * 4 + if deal_type == "SwaptionDeal": + headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"] + elif deal_type == "CreditDefaultSwapDeal": + if termination_cp is None: + headers += ["Reserved"] * 3 + else: + headers += ["AssignedDealFunction"] + ["Reserved"] * 2 + headers += ["InitialMargin", "InitialMarginCurrency"] + if termination_cp is None: + headers += ["Reserved"] * 4 + ["CreditEventOccured"] + d = { + "DealType": deal_type, + "GoTradeId": int(globeopid[3:9]), + "Action": "Update", + "Client": "Serenitas", + "SubAction": "Termination", + "PartialTermination": "Y" if partial_termination else "N", + "TerminationAmount": termination_amount, + "TerminationDate": termination_date, + "FeesPaid": -termination_fee if termination_fee < 0 else None, + "FeesReceived": termination_fee if termination_fee > 0 else None, + "FeePaymentDate": (termination_date + 3 * bus_day).date(), + } + if "FeeCurrency" in headers: + d["FeeCurrency"] = ccy + if termination_cp is not None: + d["AssignedCounterparty"] = termination_cp + buf = StringIO() + csvwriter = csv.DictWriter(buf, headers) + csvwriter.writeheader() + csvwriter.writerow(d) + timestamp = datetime.datetime.now() + trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T" + file_path = ( + base_dir + / str(timestamp.date()) + / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv" + ) + file_path.write_bytes(buf.getvalue().encode()) + return file_path + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( @@ -818,7 +964,7 @@ if __name__ == "__main__": ) parser.add_argument("fund", nargs="?", default="SERCGMAST") args = parser.parse_args() - q = get_redis_queue() + r = get_redis_queue() try: from env import DAILY_DIR except KeyError: @@ -826,28 +972,13 @@ if __name__ == "__main__": dawndb = dbconn("dawndb") for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]: - list_trades = get_trades(q, trade_type, args.fund) - if list_trades: - if trade_type in ["bond", "cds", "swaption"]: - process_fun = globals()[f"{trade_type}_trade_process"] - with init_bbg_session(BBG_IP) as session: - for trade in list_trades: - process_fun(dawndb, session, trade) - if trade_type == "bond" and args.fund == "SERCGMAST": - for trade in list_trades: - if trade["upload"]: - send_email(trade) - if args.fund == "SERCGMAST" or trade_type == "cds": - try: - buf = generate_csv( - filter(lambda t: t.get("upload", True), list_trades), - trade_type, - args.fund, - ) - file_path = write_buffer(buf, DAILY_DIR, trade_type, args.fund) - if not args.no_upload: - upload_file(file_path) - except IOError: - pass - q.delete(f"{trade_type}_{args.fund}") + key = f"{trade_type}_{args.fund}" + p_list = partial(process_list, key=key, upload=not args.no_upload) + r.transaction(p_list, key) + for trade_type in ["cds", "swaption", "capfloor"]: + key = f"{trade_type}_termination" + t_list = partial( + terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR + ) + r.transaction(t_list, key) dawndb.close() diff --git a/python/terminate_swaption b/python/terminate_swaption deleted file mode 120000 index 27d7aacd..00000000 --- a/python/terminate_swaption +++ /dev/null @@ -1 +0,0 @@ -build_termination.py
\ No newline at end of file diff --git a/python/terminate_tranche b/python/terminate_tranche deleted file mode 120000 index 27d7aacd..00000000 --- a/python/terminate_tranche +++ /dev/null @@ -1 +0,0 @@ -build_termination.py
\ No newline at end of file |
