diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/process_queue.py | 27 |
1 files changed, 12 insertions, 15 deletions
diff --git a/python/process_queue.py b/python/process_queue.py index 71037aa2..ba92cb01 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -41,12 +41,16 @@ def get_effective_date(d, swaption_type): return pydate_from_qldate(cal.advance(Date.from_datetime(d), 2, Days)) -def get_trades(p: redis.client.Pipeline, key: str): +def groupby(p: redis.client.Pipeline, key: str, trade_key: str): d = defaultdict(list) - for e in p.lrange(key, 0, -1): + for buf in p.lrange(key, 0, -1): trade = loads(e) - d[trade["id"]].append(trade) - for tradeid, trades in d: + d[trade[trade_key]].append(trade) + return d + + +def get_trades(p: redis.client.Pipeline, key: str): + for tradeid, trades in groupby(p, key, "id").items(): if len(trades) == 1: yield trades[0] else: @@ -86,19 +90,12 @@ def process_upload( trade_type: str, upload: bool, ) -> None: - key = f"{trade_type}_upload" - output = StringIO() - d = defaultdict(list) - for buf in p.lrange(key, 0, -1): - trade = loads(buf) - d[trade["fund"]].append(trade) - - for fund, l in d.items(): - output = StringIO() - csvwriter = csv.writer(output) + for fund, l in groupby(p, f"{trade_type}_upload", "fund").items(): + buf = StringIO() + csvwriter = csv.writer(buf) csvwriter.writerow(get_headers(trade_type, fund)) csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l) - output = buf.getvalue().encode() + buf = buf.getvalue().encode() dest = get_filepath(DAILY_DIR, trade_type, fund) dest.parent.mkdir(exist_ok=True) dest.write_bytes(buf) |
