diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 134 |
1 files changed, 72 insertions, 62 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 7b75a946..79ff271f 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -27,7 +27,7 @@ from serenitas.utils.remote import FtpClient, SftpClient from serenitas.utils import get_redis_queue from functools import partial from pyisda.date import previous_twentieth -from typing import Tuple, Union +from typing import Literal, Tuple, Union from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from tabulate import tabulate @@ -376,21 +376,28 @@ def terminate_list( base_dir: pathlib.Path = DAILY_DIR, ): trade_type, fund, _ = key.split("_") + term_dict = {"T": [], "A": []} for term in p.lrange(key, 0, -1): termination = loads(term) try: - buf, trade_type = build_termination( - base_dir, conn, fund=fund, **termination - ) + build_termination(term_dict, conn, fund=fund, **termination) except TypeError as e: logging.error(e) return - else: - dest = get_filepath(base_dir, trade_type, fund) - if upload: - upload_buf(buf, dest.name, fund, trade_type) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) + + for k, v in term_dict.items(): + if v: + dest = get_filepath(base_dir, (trade_type, k), fund) + buf = StringIO() + csvwriter = csv.DictWriter(buf, get_termination_headers(trade_type, k)) + csvwriter.writeheader() + csvwriter.writerows(v) + buf = buf.getvalue().encode() + + if upload: + upload_buf(buf, dest.name, fund, trade_type) + dest.parent.mkdir(exist_ok=True) + dest.write_bytes(buf) p.delete(key) @@ -676,7 +683,7 @@ def get_bbg_data( isfloater = bbg_data["FLOATER"] == "Y" pay_delay = bbg_data.get("MTG_PAY_DELAY", 0) day_count = bbg_data.get("DAY_CNT_DES") - if m := re.match("[^(\s]+", day_count): + if m := re.match(r"[^(\s]+", day_count): day_count = m.group(0) if isinstance(pay_delay, str): pay_delay = int(pay_delay.split(" ")[0]) @@ -862,7 +869,7 @@ def get_filepath( } trade_tag: str if isinstance(trade_type, tuple): - trade_tag = trade_type[0] + trade_type[1] + trade_tag = d[trade_type[0]] + trade_type[1] else: trade_tag = d[trade_type] @@ -936,33 +943,7 @@ def print_trade(trade): return tabulate((k, v) for k, v in d.items()) -def build_termination( - base_dir: pathlib.Path, - conn, - *, - dealid, - termination_fee: float, - fee_payment_date: datetime.date, - termination_date=datetime.date.today(), - termination_amount=None, - termination_cp=None, - globeopid=None, - partial_termination: bool = False, - is_assignment: bool = False, - ccy: str = "USD", - fund: str = "SERCGMAST", - **kwargs, -) -> (bytes, Tuple[str, str]): - """if termination_amount is None, assume full termination - if termination_cp is None assume termination, otherwise assignment - """ - if dealid.startswith("SWPTN"): - deal_type = "SwaptionDeal" - elif dealid.startswith("SCCDS"): - deal_type = "CreditDefaultSwapDeal" - else: - deal_type = "CapFloorDeal" - +def get_termination_headers(trade_type: str, term_type: Literal["A", "T"]): headers = [ "DealType", "DealId", @@ -978,25 +959,17 @@ def build_termination( "Reserved", "ClientReference", ] - if deal_type == "CreditDefaultSwapDeal": + if trade_type == "cds": headers += ["TradeDate", "EffectiveDate", "FirstCouponDate"] - with conn.cursor() as c: - c.execute( - "SELECT (detach-attach)/(orig_detach-orig_attach) " - "FROM cds WHERE dealid=%s", - (dealid,), - ) - (tranche_factor,) = c.fetchone() - termination_amount *= tranche_factor else: headers += ["Reserved"] * 3 headers += ["FeePaymentDate", "SpecialInstructions"] - if is_assignment: + if term_type == "A": headers += ["AssignedCounterparty"] else: headers += ["Reserved"] - if deal_type == "CreditDefaultSwapDeal" and is_assignment: + if trade_type == "cds" and term_type == "A": headers += [ "AssignmentFee", "AssignedFeeTradeDate", @@ -1009,21 +982,61 @@ def build_termination( else: headers += ["Reserved"] * 7 headers += ["GoTradeId"] - if deal_type == "CreditDefaultSwapDeal": + if trade_type == "cds": headers += ["FeeComments", "ZeroOutInterestCashFlows"] else: headers += ["Reserved"] * 2 headers += ["Reserved"] * 4 - if deal_type == "SwaptionDeal": + if trade_type == "swaption": headers += ["Reserved"] * 2 + ["InMoney", "FeeCurrency"] - elif deal_type == "CreditDefaultSwapDeal": - if is_assignment: + elif trade_type == "cds": + if term_type == "A": headers += ["Reserved"] * 3 else: headers += ["AssignedDealFunction"] + ["Reserved"] * 2 headers += ["InitialMargin", "InitialMarginCurrency"] - if not is_assignment: + if term_type == "T": headers += ["Reserved"] * 4 + ["CreditEventOccured"] + return headers + + +def build_termination( + term_dict: dict, + conn, + *, + dealid, + termination_fee: float, + fee_payment_date: datetime.date, + termination_date=datetime.date.today(), + termination_amount=None, + termination_cp=None, + globeopid=None, + partial_termination: bool = False, + is_assignment: bool = False, + ccy: str = "USD", + fund: str = "SERCGMAST", + **kwargs, +) -> None: + """if termination_amount is None, assume full termination + if termination_cp is None assume termination, otherwise assignment + """ + if dealid.startswith("SWPTN"): + deal_type = "SwaptionDeal" + elif dealid.startswith("SCCDS"): + deal_type = "CreditDefaultSwapDeal" + else: + deal_type = "CapFloorDeal" + + if deal_type == "CreditDefaultSwapDeal": + with conn.cursor() as c: + c.execute( + "SELECT (detach-attach)/(orig_detach-orig_attach) " + "FROM cds WHERE dealid=%s", + (dealid,), + ) + (tranche_factor,) = c.fetchone() + termination_amount *= tranche_factor + d = { "DealType": deal_type, "DealId": dealid if globeopid is None else None, @@ -1038,16 +1051,13 @@ def build_termination( "FeesReceived": termination_fee if termination_fee > 0 else None, "FeePaymentDate": fee_payment_date, } - if "FeeCurrency" in headers: + if deal_type == "SwaptionDeal" or ( + deal_type == "CreditDefaultSwapDeal" and is_assignment + ): d["FeeCurrency"] = ccy if is_assignment: d["AssignedCounterparty"] = termination_cp - buf = StringIO() - csvwriter = csv.DictWriter(buf, headers) - csvwriter.writeheader() - csvwriter.writerow(d) - trade_type = (deal_type, "A") if is_assignment else (deal_type, "T") - return buf.getvalue().encode(), trade_type + term_dict["A" if is_assignment else "T"].append(d) if __name__ == "__main__": |
