aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/process_queue.py96
1 files changed, 51 insertions, 45 deletions
diff --git a/python/process_queue.py b/python/process_queue.py
index 66bbdb08..71037aa2 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -19,7 +19,7 @@ except KeyError:
sys.exit("Please set path of daily directory in 'DAILY_DIR'")
from collections import defaultdict
-from pickle import loads
+from pickle import dumps, loads
from serenitas.analytics.bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from serenitas.utils.db import dbconn
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
@@ -59,36 +59,52 @@ def get_trades(p: redis.client.Pipeline, key: str):
yield trades[-1]
-def process_list(
+def process_indicative(
p: redis.client.Pipeline,
- key: str,
+ trade_type: str,
upload: bool,
session: blpapi.session.Session,
conn: psycopg2.extensions.connection,
) -> None:
- trade_type, fund = key.split("_")
- trades = get_trades(p, key)
- if trade_type in ["bond", "cds", "swaption"]:
- process_fun = globals()[f"{trade_type}_trade_process"]
- trades = [process_fun(dawndb, session, trade) for trade in trades]
- if trade_type == "bond" and fund == "SERCGMAST":
- trades = [send_email(trade) for trade in trades]
- if fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption"):
- try:
- buf = generate_csv(
- (t for t in trades if t.get("upload", True)),
- trade_type,
- fund,
- )
- except IOError as e:
- logging.info(e)
- pass
+ trade_type, _ = key.split("_")
+ process_fun = globals()[f"{trade_type}_trade_process"]
+ for trade in get_trades(p, key):
+ process_fun(conn, session, p, trade)
+ if trade_type == "bond":
+ continue
else:
- dest = get_filepath(DAILY_DIR, trade_type, fund)
- if upload:
- upload_buf(buf, dest.name, fund, trade_type)
- dest.parent.mkdir(exist_ok=True)
- dest.write_bytes(buf)
+ fund = trade_type["fund"]
+ if trade.get("upload", True) and (
+ fund in ("SERCGMAST", "BOWDST") or trade_type in ("cds", "swaption")
+ ):
+ p.rpush(f"{trade_type}_upload", dumps(trade))
+ p.delete(key)
+
+
+def process_upload(
+ p: redis.client.Pipeline,
+ trade_type: str,
+ upload: bool,
+) -> None:
+ key = f"{trade_type}_upload"
+ output = StringIO()
+ d = defaultdict(list)
+ for buf in p.lrange(key, 0, -1):
+ trade = loads(buf)
+ d[trade["fund"]].append(trade)
+
+ for fund, l in d.items():
+ output = StringIO()
+ csvwriter = csv.writer(output)
+ csvwriter.writerow(get_headers(trade_type, fund))
+ csvwriter.writerows(build_line(trade, trade_type, fund) for trade in l)
+ output = buf.getvalue().encode()
+ dest = get_filepath(DAILY_DIR, trade_type, fund)
+ dest.parent.mkdir(exist_ok=True)
+ dest.write_bytes(buf)
+ if upload:
+ upload_buf(buf, dest.name, fund, trade_type)
+
p.delete(key)
@@ -560,21 +576,6 @@ def cds_trade_process(conn, session, trade):
return trade
-def generate_csv(l, trade_type="bond", fund="SERCGMAST"):
- output = StringIO()
- csvwriter = csv.writer(output)
- empty = True
- for trade in l:
- if empty:
- csvwriter.writerow(get_headers(trade_type, fund))
- empty = False
- csvwriter.writerow(build_line(trade.copy(), trade_type, fund))
- if empty:
- raise IOError("empty trade queue")
- else:
- return output.getvalue().encode()
-
-
def get_filepath(
base_dir: pathlib.Path,
trade_type: Union[str, Tuple[str, str]],
@@ -730,7 +731,6 @@ if __name__ == "__main__":
parser.add_argument(
"-n", "--no-upload", action="store_true", help="do not upload to Globeop"
)
- parser.add_argument("fund", nargs="?", default="SERCGMAST")
args = parser.parse_args()
r = get_redis_queue()
dawndb = dbconn("dawndb")
@@ -744,15 +744,21 @@ if __name__ == "__main__":
"spot",
"capfloor",
]:
- key = f"{trade_type}_{args.fund}"
p_list = partial(
- process_list,
- key=key,
+ process_indicative,
+ trade_type=trade_type,
upload=not args.no_upload,
session=session,
conn=dawndb,
)
- r.transaction(p_list, key)
+ r.transaction(p_list, trade_type)
+ p_upload = partial(
+ process_upload,
+ trade_type=trade_type,
+ upload=not args.no_upload,
+ )
+ r.transaction(p_upload, trade_type)
+
for trade_type in ("cds", "swaption", "capfloor"):
key = f"{trade_type}_{args.fund}_termination"
t_list = partial(