aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py32
1 files changed, 16 insertions, 16 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index c4c1ff9e..66bbdb08 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -18,7 +18,7 @@ try:
except KeyError:
sys.exit("Please set path of daily directory in 'DAILY_DIR'")
-from itertools import groupby
+from collections import defaultdict
from pickle import loads
from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from serenitas.utils.db import dbconn
@@ -42,21 +42,21 @@ def get_effective_date(d, swaption_type):
def get_trades(p: redis.client.Pipeline, key: str):
- df = [loads(e) for e in p.lrange(key, 0, -1)]
- if df:
- for tradeid, v in groupby(df, lambda x: x["id"]):
- trades = list(v)
- trades = sorted(trades, key=lambda x: x["lastupdate"])
- if len(trades) == 1:
- yield trades[0]
- else:
- if trades[-1]["action"] == "CANCEL":
- continue
- if trades[0]["action"] == "NEW":
- trades[-1]["action"] = "NEW"
- yield trades[-1]
- if trades[-1]["action"] == "UPDATE":
- yield trades[-1]
+ d = defaultdict(list)
+ for e in p.lrange(key, 0, -1):
+ trade = loads(e)
+ d[trade["id"]].append(trade)
+ for tradeid, trades in d:
+ if len(trades) == 1:
+ yield trades[0]
+ else:
+ if trades[-1]["action"] == "CANCEL":
+ continue
+ if trades[0]["action"] == "NEW":
+ trades[-1]["action"] = "NEW"
+ yield trades[-1]
+ if trades[-1]["action"] == "UPDATE":
+ yield trades[-1]
def process_list(