diff options
| -rw-r--r-- | python/collateral/__main__.py | 1 | ||||
| -rw-r--r-- | python/collateral/cs.py | 2 | ||||
| -rw-r--r-- | python/process_queue.py | 33 |
3 files changed, 30 insertions, 6 deletions
diff --git a/python/collateral/__main__.py b/python/collateral/__main__.py index 36d424ba..2e33c6bc 100644 --- a/python/collateral/__main__.py +++ b/python/collateral/__main__.py @@ -42,6 +42,7 @@ dawn_trades = get_dawn_trades(args.workdate, dawn_engine) df = {} mapping = {"baml_fcm": "BAML", "wells": "WF"} args.workdate -= BDay() +counterparties.pop() # remove cs for cp in counterparties: cp_mod = import_module("." + cp, "collateral") if cp in ["baml_fcm", "wells"]: diff --git a/python/collateral/cs.py b/python/collateral/cs.py index b2d5c959..ebb06a27 100644 --- a/python/collateral/cs.py +++ b/python/collateral/cs.py @@ -7,7 +7,7 @@ def download_files(count=20): em = ExchangeMessage() emails = em.get_msgs( - path=["NYops", "Margin calls CS"], count=count, subject__contains="Margin" + path=["NYops", "Margin Calls CS"], count=count, subject__contains="Margin" ) DATA_DIR = DAILY_DIR / "CS_reports" for msg in emails: diff --git a/python/process_queue.py b/python/process_queue.py index c03a8607..87722e02 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -1,7 +1,9 @@ import argparse +import blpapi import csv import datetime import logging +import psycopg2 import pathlib import re import redis @@ -315,7 +317,13 @@ def get_trades(p: redis.client.Pipeline, key: str): yield trades[-1] -def process_list(p: redis.client.Pipeline, key: str, upload: bool) -> None: +def process_list( + p: redis.client.Pipeline, + key: str, + upload: bool, + session: blpapi.session.Session, + conn: psycopg2.extensions.connection, +) -> None: trade_type, fund = key.split("_") trades = get_trades(p, key) if trade_type in ["bond", "cds", "swaption"]: @@ -971,10 +979,25 @@ if __name__ == "__main__": sys.exit("Please set path of daily directory in 'DAILY_DIR'") dawndb = dbconn("dawndb") - for trade_type in ["bond", "cds", "swaption", "future", "wire", "spot", "capfloor"]: - key = f"{trade_type}_{args.fund}" - p_list = partial(process_list, key=key, upload=not args.no_upload) - r.transaction(p_list, key) + with init_bbg_session(BBG_IP) as session: + for trade_type in [ + "bond", + "cds", + "swaption", + "future", + "wire", + "spot", + "capfloor", + ]: + key = f"{trade_type}_{args.fund}" + p_list = partial( + process_list, + key=key, + upload=not args.no_upload, + session=session, + conn=dawndb, + ) + r.transaction(p_list, key) for trade_type in ["cds", "swaption", "capfloor"]: key = f"{trade_type}_termination" t_list = partial( |
