diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 245 |
1 files changed, 188 insertions, 57 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 25480b47..51e4302b 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -4,6 +4,7 @@ import datetime import logging import pathlib import re +import redis import sys import task_server.config as config @@ -16,6 +17,7 @@ from pickle import loads from ftplib import FTP from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from common import get_redis_queue +from functools import partial from pyisda.date import previous_twentieth from utils.db import dbconn from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date @@ -295,26 +297,59 @@ def get_effective_date(d, swaption_type): return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) -def get_trades(q, trade_type="bond", fund="SERCGMAST"): - queue_name = f"{trade_type}_{fund}" - r = q.lrange(queue_name, 0, -1) - df = [loads(e) for e in r] - list_trades = [] +def get_trades(p: redis.client.Pipeline, key: str): + df = [loads(e) for e in p.lrange(key, 0, -1)] if df: for tradeid, v in groupby(df, lambda x: x["id"]): trades = list(v) trades = sorted(trades, key=lambda x: x["lastupdate"]) if len(trades) == 1: - list_trades.append(trades[0]) + yield trades[0] else: if trades[-1]["action"] == "CANCEL": continue if trades[0]["action"] == "NEW": trades[-1]["action"] = "NEW" - list_trades.append(trades[-1]) + yield trades[-1] if trades[-1]["action"] == "UPDATE": - list_trades.append(trades[-1]) - return list_trades + yield trades[-1] + + +def process_list(p: redis.client.Pipeline, key: str, upload: bool) -> None: + trade_type, fund = key.split("_") + trades = get_trades(p, key) + if trade_type in ["bond", "cds", "swaption"]: + process_fun = globals()[f"{trade_type}_trade_process"] + with init_bbg_session(BBG_IP) as session: + trades = (process_fun(dawndb, session, trade) for trade in trades) + if trade_type == "bond" and fund == "SERCGMAST": + trades = (send_email(trade) for trade in trades) + if fund == "SERCGMAST" or trade_type in ("cds", "swaption"): + try: + buf = generate_csv( + (t for t in trades if t.get("upload", True)), trade_type, fund, + ) + file_path = write_buffer(buf, DAILY_DIR, trade_type, fund) + if upload: + upload_file(file_path) + except IOError: + pass + p.delete(key) + + +def terminate_list( + p: redis.client.Pipeline, key: str, upload: bool, base_dir: pathlib.Path +) -> None: + trade_type = key.split("_")[0] + for term in p.lrange(key, 0, -1): + termination = loads(term) + try: + f = build_termination(base_dir, **termination) + except TypeError as e: + logging.error(e) + if upload: + upload_file(f) + p.delete(key) def rename_keys(d, mapping): @@ -632,30 +667,31 @@ def bond_trade_process(conn, session, trade): currentface = trade["faceamount"] * bbg_data["MTG_FACTOR_SET_DT"] accrued_payment = bbg_data["INT_ACC"] * currentface / 100.0 principal_payment = currentface * trade["price"] / 100.0 - with conn: - with conn.cursor() as c: - c.execute( - "UPDATE bonds SET principal_payment = %s, accrued_payment = %s " - "WHERE id = %s", - (principal_payment, accrued_payment, int(trade["id"])), - ) + with conn.cursor() as c: + c.execute( + "UPDATE bonds SET principal_payment = %s, accrued_payment = %s " + "WHERE id = %s", + (principal_payment, accrued_payment, int(trade["id"])), + ) # mark it at buy price if trade["buysell"]: sqlstr = "INSERT INTO marks VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" - with conn: - with conn.cursor() as c: - c.execute( - sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) - ) + with conn.cursor() as c: + c.execute( + sqlstr, (trade["trade_date"], trade["identifier"], trade["price"]) + ) + return trade def send_email(trade): # send out email with trade content - email = GmailMessage() - email.set_content(print_trade(trade)) - email["To"] = "nyops@lmcg.com" - email["Subject"] = email_subject(trade) - email.send() + if trade["upload"]: + email = GmailMessage() + email.set_content(print_trade(trade)) + email["To"] = "nyops@lmcg.com" + email["Subject"] = email_subject(trade) + email.send() + return trade def is_tranche_trade(trade): @@ -671,13 +707,14 @@ def swaption_trade_process(conn, session, trade): with conn.cursor() as c: c.execute(sqlstr, trade) try: - factor, = c.fetchone() + (factor,) = c.fetchone() except TypeError: - return + return trade except ValueError: - return + return trade else: trade["factor"] = factor + return trade def cds_trade_process(conn, session, trade): @@ -688,7 +725,7 @@ def cds_trade_process(conn, session, trade): try: with conn.cursor() as c: c.execute(sqlstr, trade) - factor, = c.fetchone() + (factor,) = c.fetchone() except ValueError: bbg_data = get_bbg_data( conn, @@ -736,10 +773,11 @@ def cds_trade_process(conn, session, trade): def generate_csv(l, trade_type="bond", fund="SERCGMAST"): output = StringIO() csvwriter = csv.writer(output) - csvwriter.writerow(HEADERS[trade_type]) empty = True for trade in l: - empty = False + if empty: + csvwriter.writerow(HEADERS[trade_type]) + empty = False csvwriter.writerow(build_line(trade.copy(), trade_type)) if empty: raise IOError("empty trade queue") @@ -811,6 +849,114 @@ def print_trade(trade): return tabulate((k, v) for k, v in d.items()) +def build_termination( + base_dir: pathlib.Path, + *, + dealid, + termination_fee, + termination_date=datetime.date.today(), + termination_amount=None, + termination_cp=None, + globeopid=None, + partial_termination: bool = False, + ccy: str = "USD", + **kwargs, +): + """ if termination_amount is None, assume full termination + if termination_cp is None assume termination, otherwise assignment + """ + if dealid.startswith("SWPTN"): + deal_type = "SwaptionDeal" + elif dealid.startswith("SCCDS"): + deal_type = "CreditDefaultSwapDeal" + else: + deal_type = "CapFloorDeal" + + headers = [ + "DealType", + "DealId", + "Action", + "Client", + "SubAction", + "PartialTermination", + "TerminationAmount", + "TerminationDate", + "FeesPaid", + "FeesReceived", + "DealFunction", + "Reserved", + "ClientReference", + ] + if deal_type == "CreditDefaultSwapDeal": + headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"] + else: + headers += ["Reserved"] * 3 + headers += ["FeePaymentDate", "SpecialInstructions"] + + if termination_cp is not None: + headers += ["AssignedCounterparty"] + else: + headers += ["Reserved"] + if deal_type == "CreditDefaultSwapDeal" and termination_cp is not None: + headers += [ + "AssignmentFee", + "AssignedFeeTradeDate", + "AssignedFeeValueDate", + "AssignedCustodian", + "AssignedCashAccount", + "Reserved", + "FeeCurrency", + ] + else: + headers += ["Reserved"] * 7 + headers += ["GoTradeId"] + if deal_type == "CreditDefaultSwapDeal": + headers += ["FeeComments", "ZeroOutInterestCashFlows"] + else: + headers += ["Reserved"] * 2 + headers += ["Reserved"] * 4 + if deal_type == "SwaptionDeal": + headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"] + elif deal_type == "CreditDefaultSwapDeal": + if termination_cp is None: + headers += ["Reserved"] * 3 + else: + headers += ["AssignedDealFunction"] + ["Reserved"] * 2 + headers += ["InitialMargin", "InitialMarginCurrency"] + if termination_cp is None: + headers += ["Reserved"] * 4 + ["CreditEventOccured"] + d = { + "DealType": deal_type, + "GoTradeId": int(globeopid[3:9]), + "Action": "Update", + "Client": "Serenitas", + "SubAction": "Termination", + "PartialTermination": "Y" if partial_termination else "N", + "TerminationAmount": termination_amount, + "TerminationDate": termination_date, + "FeesPaid": -termination_fee if termination_fee < 0 else None, + "FeesReceived": termination_fee if termination_fee > 0 else None, + "FeePaymentDate": (termination_date + 3 * bus_day).date(), + } + if "FeeCurrency" in headers: + d["FeeCurrency"] = ccy + if termination_cp is not None: + d["AssignedCounterparty"] = termination_cp + buf = StringIO() + csvwriter = csv.DictWriter(buf, headers) + csvwriter.writeheader() + csvwriter.writerow(d) + timestamp = datetime.datetime.now() + trade_type = f"{deal_type}A" if termination_cp is not None else f"{deal_type}T" + file_path = ( + base_dir + / str(timestamp.date()) + / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv" + ) + file_path.write_bytes(buf.getvalue().encode()) + return file_path + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( @@ -818,7 +964,7 @@ if __name__ == "__main__": ) parser.add_argument("fund", nargs="?", default="SERCGMAST") args = parser.parse_args() - q = get_redis_queue() + r = get_redis_queue() try: from env import DAILY_DIR except KeyError: @@ -826,28 +972,13 @@ if __name__ == "__main__": dawndb = dbconn("dawndb") for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]: - list_trades = get_trades(q, trade_type, args.fund) - if list_trades: - if trade_type in ["bond", "cds", "swaption"]: - process_fun = globals()[f"{trade_type}_trade_process"] - with init_bbg_session(BBG_IP) as session: - for trade in list_trades: - process_fun(dawndb, session, trade) - if trade_type == "bond" and args.fund == "SERCGMAST": - for trade in list_trades: - if trade["upload"]: - send_email(trade) - if args.fund == "SERCGMAST" or trade_type == "cds": - try: - buf = generate_csv( - filter(lambda t: t.get("upload", True), list_trades), - trade_type, - args.fund, - ) - file_path = write_buffer(buf, DAILY_DIR, trade_type, args.fund) - if not args.no_upload: - upload_file(file_path) - except IOError: - pass - q.delete(f"{trade_type}_{args.fund}") + key = f"{trade_type}_{args.fund}" + p_list = partial(process_list, key=key, upload=not args.no_upload) + r.transaction(p_list, key) + for trade_type in ["cds", "swaption", "capfloor"]: + key = f"{trade_type}_termination" + t_list = partial( + terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR + ) + r.transaction(t_list, key) dawndb.close() |
