diff options
Diffstat (limited to 'python/process_queue.py')
| -rw-r--r-- | python/process_queue.py | 41 |
1 files changed, 22 insertions, 19 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index ba92cb01..1bf80adc 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -44,7 +44,7 @@ def get_effective_date(d, swaption_type): def groupby(p: redis.client.Pipeline, key: str, trade_key: str): d = defaultdict(list) for buf in p.lrange(key, 0, -1): - trade = loads(e) + trade = loads(buf) d[trade[trade_key]].append(trade) return d @@ -70,19 +70,20 @@ def process_indicative( session: blpapi.session.Session, conn: psycopg2.extensions.connection, ) -> None: - trade_type, _ = key.split("_") - process_fun = globals()[f"{trade_type}_trade_process"] - for trade in get_trades(p, key): - process_fun(conn, session, p, trade) + process_fun = globals().get( + f"{trade_type}_trade_process", lambda conn, session, trade: trade + ) + for trade in get_trades(p, trade_type): + process_fun(conn, session, trade) if trade_type == "bond": continue else: - fund = trade_type["fund"] + fund = trade["fund"] if trade.get("upload", True) and ( fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption") ): p.rpush(f"{trade_type}_upload", dumps(trade)) - p.delete(key) + p.delete(trade_type) def process_upload( @@ -90,7 +91,8 @@ def process_upload( trade_type: str, upload: bool, ) -> None: - for fund, l in groupby(p, f"{trade_type}_upload", "fund").items(): + key = f"{trade_type}_upload" + for fund, l in groupby(p, key, "fund").items(): buf = StringIO() csvwriter = csv.writer(buf) csvwriter.writerow(get_headers(trade_type, fund)) @@ -749,17 +751,18 @@ if __name__ == "__main__": conn=dawndb, ) r.transaction(p_list, trade_type) - p_upload = partial( - process_upload, - trade_type=trade_type, - upload=not args.no_upload, - ) - r.transaction(p_upload, trade_type) + p_upload = partial( + process_upload, + trade_type=trade_type, + upload=not args.no_upload, + ) + r.transaction(p_upload, trade_type) for trade_type in ("cds", "swaption", "capfloor"): - key = f"{trade_type}_{args.fund}_termination" - t_list = partial( - terminate_list, key=key, upload=not args.no_upload, conn=dawndb - ) - r.transaction(t_list, key) + for fund in ("SERCGMAST", "BOWDST", "BRINKER"): + key = f"{trade_type}_{fund}_termination" + t_list = partial( + terminate_list, key=key, upload=not args.no_upload, conn=dawndb + ) + r.transaction(t_list, key) dawndb.close() |
