aboutsummaryrefslogtreecommitdiffstats
path: root/python/process_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/process_queue.py')
-rw-r--r--python/process_queue.py41
1 files changed, 22 insertions, 19 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index ba92cb01..1bf80adc 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -44,7 +44,7 @@ def get_effective_date(d, swaption_type):
def groupby(p: redis.client.Pipeline, key: str, trade_key: str):
d = defaultdict(list)
for buf in p.lrange(key, 0, -1):
- trade = loads(e)
+ trade = loads(buf)
d[trade[trade_key]].append(trade)
return d
@@ -70,19 +70,20 @@ def process_indicative(
session: blpapi.session.Session,
conn: psycopg2.extensions.connection,
) -> None:
- trade_type, _ = key.split("_")
- process_fun = globals()[f"{trade_type}_trade_process"]
- for trade in get_trades(p, key):
- process_fun(conn, session, p, trade)
+ process_fun = globals().get(
+ f"{trade_type}_trade_process", lambda conn, session, trade: trade
+ )
+ for trade in get_trades(p, trade_type):
+ process_fun(conn, session, trade)
if trade_type == "bond":
continue
else:
- fund = trade_type["fund"]
+ fund = trade["fund"]
if trade.get("upload", True) and (
fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption")
):
p.rpush(f"{trade_type}_upload", dumps(trade))
- p.delete(key)
+ p.delete(trade_type)
def process_upload(
@@ -90,7 +91,8 @@ def process_upload(
trade_type: str,
upload: bool,
) -> None:
- for fund, l in groupby(p, f"{trade_type}_upload", "fund").items():
+ key = f"{trade_type}_upload"
+ for fund, l in groupby(p, key, "fund").items():
buf = StringIO()
csvwriter = csv.writer(buf)
csvwriter.writerow(get_headers(trade_type, fund))
@@ -749,17 +751,18 @@ if __name__ == "__main__":
conn=dawndb,
)
r.transaction(p_list, trade_type)
- p_upload = partial(
- process_upload,
- trade_type=trade_type,
- upload=not args.no_upload,
- )
- r.transaction(p_upload, trade_type)
+ p_upload = partial(
+ process_upload,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ )
+ r.transaction(p_upload, trade_type)
for trade_type in ("cds", "swaption", "capfloor"):
- key = f"{trade_type}_{args.fund}_termination"
- t_list = partial(
- terminate_list, key=key, upload=not args.no_upload, conn=dawndb
- )
- r.transaction(t_list, key)
+ for fund in ("SERCGMAST", "BOWDST", "BRINKER"):
+ key = f"{trade_type}_{fund}_termination"
+ t_list = partial(
+ terminate_list, key=key, upload=not args.no_upload, conn=dawndb
+ )
+ r.transaction(t_list, key)
dawndb.close()