diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 12026fa8..1a9269b2 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -21,6 +21,7 @@ from common import get_redis_queue from functools import partial from pyisda.date import previous_twentieth from remote import FtpClient, SftpClient +from typing import Tuple, Union from utils.db import dbconn from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date from tabulate import tabulate @@ -360,13 +361,15 @@ def terminate_list( for term in p.lrange(key, 0, -1): termination = loads(term) try: - f = build_termination(base_dir, **termination) + buf, trade_type = build_termination(base_dir, **termination) except TypeError as e: logging.error(e) return else: + dest = get_filepath(base_dir, trade_type) if upload: - upload_file(f) + upload_buf(buf, dest.name) + dest.write_bytes(buf) p.delete(key) @@ -813,7 +816,9 @@ def generate_csv(l, trade_type="bond", fund="SERCGMAST"): def get_filepath( - base_dir: pathlib.Path, trade_type: str, fund: str = "SERCGMAST" + base_dir: pathlib.Path, + trade_type: Union[str, Tuple[str, str]], + fund: str = "SERCGMAST", ) -> pathlib.Path: d = { "bond": "Mortgages", @@ -824,6 +829,12 @@ def get_filepath( "spot": "SpotDeal", "capfloor": "TODO", } + trade_tag: str + if isinstance(trade_type, tuple): + trade_tag = trade_type[0] + trade_type[1] + else: + trade_tag = d[trade_type] + timestamp = datetime.datetime.now() if fund == "BRINKER": return ( @@ -835,17 +846,17 @@ def get_filepath( return ( base_dir / str(timestamp.date()) - / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv" + / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" ) elif fund == "BOWDST": return ( base_dir / str(timestamp.date()) - / f"LMC01CFE.Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv" + / f"LMC01CFE.Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv" ) -def upload_buf(buf: bytes, dest: str, fund: str) -> None: +def upload_buf(buf: bytes, dest: str, fund: str = "SERCGMAST") -> None: if fund == "BRINKER": sftp = SftpClient.from_creds("bbh") sftp.put(buf, dest) @@ -900,7 +911,7 @@ def build_termination( is_assignment: bool = False, ccy: str = "USD", **kwargs, -): +) -> (bytes, Tuple[str, str]): """ if termination_amount is None, assume full termination if termination_cp is None assume termination, otherwise assignment """ @@ -986,14 +997,8 @@ def build_termination( csvwriter.writeheader() csvwriter.writerow(d) timestamp = datetime.datetime.now() - trade_type = f"{deal_type}A" if is_assignment else f"{deal_type}T" - file_path = ( - base_dir - / str(timestamp.date()) - / f"Serenitas.ALL.{timestamp:%Y%m%d.%H%M%S}.{trade_type}.csv" - ) - file_path.write_bytes(buf.getvalue().encode()) - return file_path + trade_type = (deal_type, "A") if is_assignment else (deal_type, "T") + return buf.getvalue().encode(), trade_type if __name__ == "__main__": @@ -1029,7 +1034,7 @@ if __name__ == "__main__": conn=dawndb, ) r.transaction(p_list, key) - for trade_type in ["cds", "swaption", "capfloor"]: + for trade_type in ("cds", "swaption", "capfloor"): key = f"{trade_type}_termination" t_list = partial( terminate_list, key=key, upload=not args.no_upload, base_dir=DAILY_DIR |
