diff options
Diffstat (limited to 'python/quote_parsing/__main__.py')
| -rw-r--r-- | python/quote_parsing/__main__.py | 101 |
1 files changed, 50 insertions, 51 deletions
diff --git a/python/quote_parsing/__main__.py b/python/quote_parsing/__main__.py index 557edd45..30f607de 100644 --- a/python/quote_parsing/__main__.py +++ b/python/quote_parsing/__main__.py @@ -9,7 +9,7 @@ from serenitas.utils.env import DATA_DIR from serenitas.utils import SerenitasRotatingFileHandler from . import logger from .parse_emails import parse_email, write_todb -from serenitas.utils.db import serenitas_pool +from serenitas.utils.db2 import serenitas_pool fh = SerenitasRotatingFileHandler("emails_parsing.log", 1_000_000, 5) logger.addHandler(fh) @@ -40,60 +40,59 @@ try: except FileNotFoundError: already_uploaded = {} -conn = serenitas_pool.getconn() -for f in emails: - date_composed, msg_id = f.name.split("_") - date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S") - if msg_id == "16e4b563f6cff219": - # GS message has IG quotes with a HY header - continue - if msg_id == "17b40531791bb7c2": - # There is a % sign with no space, breaking it - continue - if msg_id in already_uploaded: - continue - else: - try: - key, (option_stack, fwd_index) = parse_email(f, date_composed, conn) - except RuntimeError as e: - logger.error(e) - except ValueError as e: - raise ValueError(f"{f.name}") from e +with serenitas_pool.connection() as conn: + for f in emails: + date_composed, msg_id = f.name.split("_") + date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S") + if msg_id == "16e4b563f6cff219": + # GS message has IG quotes with a HY header + continue + if msg_id == "17b40531791bb7c2": + # There is a % sign with no space, breaking it + continue + if msg_id in already_uploaded: + continue else: - if key[0] is None or len(option_stack) == 0: - logger.error(f"Something wrong with email: {f.name}") - continue - swaption_stack[key] = pd.concat( - option_stack, names=["expiry", "series", "version"] - ) - fwd_index["msg_id"] = int(msg_id, 16) - index_data = index_data.append(fwd_index) - already_uploaded[msg_id] = key[0] -if index_data.empty: - sys.exit() -for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]: - if col in index_data: - index_data[col] = pd.to_numeric(index_data[col]) -index_data["index"] = index_data["index"].astype("category") + try: + key, (option_stack, fwd_index) = parse_email(f, date_composed, conn) + except RuntimeError as e: + logger.error(e) + except ValueError as e: + raise ValueError(f"{f.name}") from e + else: + if key[0] is None or len(option_stack) == 0: + logger.error(f"Something wrong with email: {f.name}") + continue + swaption_stack[key] = pd.concat( + option_stack, names=["expiry", "series", "version"] + ) + fwd_index["msg_id"] = int(msg_id, 16) + index_data = index_data.append(fwd_index) + already_uploaded[msg_id] = key[0] + if index_data.empty: + sys.exit() + for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]: + if col in index_data: + index_data[col] = pd.to_numeric(index_data[col]) + index_data["index"] = index_data["index"].astype("category") -index_names = ["quotedate", "index", "quote_source"] -swaption_stack = pd.concat(swaption_stack, names=index_names, sort=False) -dup = swaption_stack.index.duplicated() -if dup.any(): - logger.warning("duplicated data") - swaption_stack = swaption_stack[~dup] + index_names = ["quotedate", "index", "quote_source"] + swaption_stack = pd.concat(swaption_stack, names=index_names, sort=False) + dup = swaption_stack.index.duplicated() + if dup.any(): + logger.warning("duplicated data") + swaption_stack = swaption_stack[~dup] -swaption_stack = swaption_stack.reset_index().set_index( - ["quotedate", "index", "series", "version", "expiry", "quote_source"] -) -swaption_stack = swaption_stack.sort_index() -index_data = index_data.reset_index() -index_data = index_data.drop_duplicates( - ["quotedate", "index", "series", "version", "expiry", "quote_source"] -) + swaption_stack = swaption_stack.reset_index().set_index( + ["quotedate", "index", "series", "version", "expiry", "quote_source"] + ) + swaption_stack = swaption_stack.sort_index() + index_data = index_data.reset_index() + index_data = index_data.drop_duplicates( + ["quotedate", "index", "series", "version", "expiry", "quote_source"] + ) -write_todb(swaption_stack, index_data, conn) -serenitas_pool.putconn(conn) + write_todb(swaption_stack, index_data, conn) with open(".pickle", "wb") as fh: pickle.dump(already_uploaded, fh) |
