diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index c4c1ff9e..66bbdb08 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -18,7 +18,7 @@ try: except KeyError: sys.exit("Please set path of daily directory in 'DAILY_DIR'") -from itertools import groupby +from collections import defaultdict from pickle import loads from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data, BBG_IP from serenitas.utils.db import dbconn @@ -42,21 +42,21 @@ def get_effective_date(d, swaption_type): def get_trades(p: redis.client.Pipeline, key: str): - df = [loads(e) for e in p.lrange(key, 0, -1)] - if df: - for tradeid, v in groupby(df, lambda x: x["id"]): - trades = list(v) - trades = sorted(trades, key=lambda x: x["lastupdate"]) - if len(trades) == 1: - yield trades[0] - else: - if trades[-1]["action"] == "CANCEL": - continue - if trades[0]["action"] == "NEW": - trades[-1]["action"] = "NEW" - yield trades[-1] - if trades[-1]["action"] == "UPDATE": - yield trades[-1] + d = defaultdict(list) + for e in p.lrange(key, 0, -1): + trade = loads(e) + d[trade["id"]].append(trade) + for tradeid, trades in d: + if len(trades) == 1: + yield trades[0] + else: + if trades[-1]["action"] == "CANCEL": + continue + if trades[0]["action"] == "NEW": + trades[-1]["action"] = "NEW" + yield trades[-1] + if trades[-1]["action"] == "UPDATE": + yield trades[-1] def process_list( |
