diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 96 |
1 files changed, 51 insertions, 45 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 66bbdb08..71037aa2 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -19,7 +19,7 @@ except KeyError: sys.exit("Please set path of daily directory in 'DAILY_DIR'") from collections import defaultdict -from pickle import loads +from pickle import dumps, loads from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from serenitas.utils.db import dbconn from serenitas.utils.exchange import ExchangeMessage, FileAttachment @@ -59,36 +59,52 @@ def get_trades(p: redis.client.Pipeline, key: str): yield trades[-1] -def process_list( +def process_indicative( p: redis.client.Pipeline, - key: str, + trade_type: str, upload: bool, session: blpapi.session.Session, conn: psycopg2.extensions.connection, ) -> None: - trade_type, fund = key.split("_") - trades = get_trades(p, key) - if trade_type in ["bond", "cds", "swaption"]: - process_fun = globals()[f"{trade_type}_trade_process"] - trades = [process_fun(dawndb, session, trade) for trade in trades] - if trade_type == "bond" and fund == "SERCGMAST": - trades = [send_email(trade) for trade in trades] - if fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption"): - try: - buf = generate_csv( - (t for t in trades if t.get("upload", True)), - trade_type, - fund, - ) - except IOError as e: - logging.info(e) - pass + trade_type, _ = key.split("_") + process_fun = globals()[f"{trade_type}_trade_process"] + for trade in get_trades(p, key): + process_fun(conn, session, p, trade) + if trade_type == "bond": + continue else: - dest = get_filepath(DAILY_DIR, trade_type, fund) - if upload: - upload_buf(buf, dest.name, fund, trade_type) - dest.parent.mkdir(exist_ok=True) - dest.write_bytes(buf) + fund = trade_type["fund"] + if trade.get("upload", True) and ( + fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") + ): + p.rpush(f"{trade_type}_upload", dumps(trade)) + p.delete(key) + + +def process_upload( + p: redis.client.Pipeline, + trade_type: str, + upload: bool, +) -> None: + key = f"{trade_type}_upload" + output = StringIO() + d = defaultdict(list) + for buf in p.lrange(key, 0, -1): + trade = loads(buf) + d[trade["fund"]].append(trade) + + for fund, l in d.items(): + output = StringIO() + csvwriter = csv.writer(output) + csvwriter.writerow(get_headers(trade_type, fund)) + csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l) + output = buf.getvalue().encode() + dest = get_filepath(DAILY_DIR, trade_type, fund) + dest.parent.mkdir(exist_ok=True) + dest.write_bytes(buf) + if upload: + upload_buf(buf, dest.name, fund, trade_type) + p.delete(key) @@ -560,21 +576,6 @@ def cds_trade_process(conn, session, trade): return trade -def generate_csv(l, trade_type="bond", fund="SERCGMAST"): - output = StringIO() - csvwriter = csv.writer(output) - empty = True - for trade in l: - if empty: - csvwriter.writerow(get_headers(trade_type, fund)) - empty = False - csvwriter.writerow(build_line(trade.copy(), trade_type, fund)) - if empty: - raise IOError("empty trade queue") - else: - return output.getvalue().encode() - - def get_filepath( base_dir: pathlib.Path, trade_type: Union[str, Tuple[str, str]], @@ -730,7 +731,6 @@ if __name__ == "__main__": parser.add_argument( "-n", "--no-upload", action="store_true", help="do not upload to Globeop" ) - parser.add_argument("fund", nargs="?", default="SERCGMAST") args = parser.parse_args() r = get_redis_queue() dawndb = dbconn("dawndb") @@ -744,15 +744,21 @@ if __name__ == "__main__": "spot", "capfloor", ]: - key = f"{trade_type}_{args.fund}" p_list = partial( - process_list, - key=key, + process_indicative, + trade_type=trade_type, upload=not args.no_upload, session=session, conn=dawndb, ) - r.transaction(p_list, key) + r.transaction(p_list, trade_type) + p_upload = partial( + process_upload, + trade_type=trade_type, + upload=not args.no_upload, + ) + r.transaction(p_upload, trade_type) + for trade_type in ("cds", "swaption", "capfloor"): key = f"{trade_type}_{args.fund}_termination" t_list = partial( |
