aboutsummaryrefslogtreecommitdiffstats
path: root/python/ops/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/ops/process_queue.py')
-rw-r--r--python/ops/process_queue.py36
1 files changed, 17 insertions, 19 deletions
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py
index 0dbb9e8b..17c81892 100644
--- a/python/ops/process_queue.py
+++ b/python/ops/process_queue.py
@@ -54,6 +54,7 @@ def process_indicative(
process_fun = globals().get(
f"{trade_type}_trade_process", lambda conn, session, trade: trade
)
+ mtm = Fund["MTM"]()
for trade in get_trades(p, trade_type):
process_fun(conn, session, trade)
fund = trade["fund"]
@@ -66,9 +67,9 @@ def process_indicative(
"CD_INDEX_TRANCHE",
"BESPOKE",
):
- DealKind[trade_type].from_dict(**trade).mtm_stage()
- if Deal := DealKind[trade_type]:
- Deal.mtm_upload()
+ mtm.stage(trade, trade_type=trade_type)
+ buf, dest = mtm.build_buffer(trade_type)
+ mtm.upload(buf, dest.name)
p.delete(trade_type)
@@ -81,7 +82,7 @@ def process_upload(
for fund_name, l in groupby(p, key, "fund").items():
fund = Fund[fund_name]()
for trade in l:
- fund.stage(trade, trade_type=trade_type)
+ fund.stage(trade, trade_type=trade_type, redis_pipeline=p)
buf, dest = fund.build_buffer(trade_type)
if upload:
fund.upload(buf, dest.name)
@@ -96,22 +97,19 @@ def terminate_list(
base_dir: pathlib.Path = DAILY_DIR,
):
trade_type, fund, _ = key.split("_")
- terms = []
+ mtm = Fund["MTM"]()
+ f = Fund[fund]()
for term in p.lrange(key, 0, -1):
- termination = loads(term)
- DealKind["termination"].from_dict(**termination).mtm_stage()
- try:
- terms.append(termination.to_globeop())
- except TypeError as e:
- logging.error(e)
- return
- DealKind["termination"].mtm_upload()
- if upload and terms:
- f = Fund[fund]()
- f.staging_queue = terms
- dest = f.get_filepath(base_dir, (trade_type, "A"))
- buf = f.build_buffer("termination")
- f.upload(buf, dest)
+ obj = DealKind["termination"].from_dict(**loads(term))
+ mtm.staging_queue.append(obj.to_markit())
+ f.staging_queue.append(obj.to_globeop())
+ # pretty crappy way, but there should be only one
+ mtm.product_type = obj.product_type
+ buf, dest = mtm.build_buffer("mtm")
+ mtm.upload(buf, dest.name)
+ if upload and f.staging_queue:
+ buf, dest = f.build_buffer((trade_type, "A"))
+ f.upload(buf, dest.name)
p.delete(key)