aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/process_queue.py27
1 files changed, 12 insertions, 15 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 71037aa2..ba92cb01 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -41,12 +41,16 @@ def get_effective_date(d, swaption_type):
return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days))
-def get_trades(p: redis.client.Pipeline, key: str):
+def groupby(p: redis.client.Pipeline, key: str, trade_key: str):
d = defaultdict(list)
- for e in p.lrange(key, 0, -1):
+ for buf in p.lrange(key, 0, -1):
trade = loads(e)
- d[trade["id"]].append(trade)
- for tradeid, trades in d:
+ d[trade[trade_key]].append(trade)
+ return d
+
+
+def get_trades(p: redis.client.Pipeline, key: str):
+ for tradeid, trades in groupby(p, key, "id").items():
if len(trades) == 1:
yield trades[0]
else:
@@ -86,19 +90,12 @@ def process_upload(
trade_type: str,
upload: bool,
) -> None:
- key = f"{trade_type}_upload"
- output = StringIO()
- d = defaultdict(list)
- for buf in p.lrange(key, 0, -1):
- trade = loads(buf)
- d[trade["fund"]].append(trade)
-
- for fund, l in d.items():
- output = StringIO()
- csvwriter = csv.writer(output)
+ for fund, l in groupby(p, f"{trade_type}_upload", "fund").items():
+ buf = StringIO()
+ csvwriter = csv.writer(buf)
csvwriter.writerow(get_headers(trade_type, fund))
csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l)
- output = buf.getvalue().encode()
+ buf = buf.getvalue().encode()
dest = get_filepath(DAILY_DIR, trade_type, fund)
dest.parent.mkdir(exist_ok=True)
dest.write_bytes(buf)