diff options
Diffstat (limited to 'python/ops/process_queue.py')
| -rw-r--r-- | python/ops/process_queue.py | 36 |
1 files changed, 17 insertions, 19 deletions
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py index 0dbb9e8b..17c81892 100644 --- a/python/ops/process_queue.py +++ b/python/ops/process_queue.py @@ -54,6 +54,7 @@ def process_indicative( process_fun = globals().get( f"{trade_type}_trade_process", lambda conn, session, trade: trade ) + mtm = Fund["MTM"]() for trade in get_trades(p, trade_type): process_fun(conn, session, trade) fund = trade["fund"] @@ -66,9 +67,9 @@ def process_indicative( "CD_INDEX_TRANCHE", "BESPOKE", ): - DealKind[trade_type].from_dict(**trade).mtm_stage() - if Deal := DealKind[trade_type]: - Deal.mtm_upload() + mtm.stage(trade, trade_type=trade_type) + buf, dest = mtm.build_buffer(trade_type) + mtm.upload(buf, dest.name) p.delete(trade_type) @@ -81,7 +82,7 @@ def process_upload( for fund_name, l in groupby(p, key, "fund").items(): fund = Fund[fund_name]() for trade in l: - fund.stage(trade, trade_type=trade_type) + fund.stage(trade, trade_type=trade_type, redis_pipeline=p) buf, dest = fund.build_buffer(trade_type) if upload: fund.upload(buf, dest.name) @@ -96,22 +97,19 @@ def terminate_list( base_dir: pathlib.Path = DAILY_DIR, ): trade_type, fund, _ = key.split("_") - terms = [] + mtm = Fund["MTM"]() + f = Fund[fund]() for term in p.lrange(key, 0, -1): - termination = loads(term) - DealKind["termination"].from_dict(**termination).mtm_stage() - try: - terms.append(termination.to_globeop()) - except TypeError as e: - logging.error(e) - return - DealKind["termination"].mtm_upload() - if upload and terms: - f = Fund[fund]() - f.staging_queue = terms - dest = f.get_filepath(base_dir, (trade_type, "A")) - buf = f.build_buffer("termination") - f.upload(buf, dest) + obj = DealKind["termination"].from_dict(**loads(term)) + mtm.staging_queue.append(obj.to_markit()) + f.staging_queue.append(obj.to_globeop()) + # pretty crappy way, but there should be only one + mtm.product_type = obj.product_type + buf, dest = mtm.build_buffer("mtm") + mtm.upload(buf, dest.name) + if upload and f.staging_queue: + buf, dest = f.build_buffer((trade_type, "A")) + f.upload(buf, dest.name) p.delete(key) |
