import argparse import datetime import logging import pandas as pd import pickle import sys from serenitas.utils.env import DATA_DIR from serenitas.utils import SerenitasRotatingFileHandler from . import logger from .parse_emails import parse_email, write_todb from serenitas.utils.db import serenitas_pool fh = SerenitasRotatingFileHandler("emails_parsing.log", 1_000_000, 5) logger.addHandler(fh) logger.setLevel(logging.WARNING) parser = argparse.ArgumentParser() parser.add_argument( "-d", "--download", action="store_true", help="download emails", default=False ) args = parser.parse_args() if args.download: from .download_emails import save_emails from googleapiclient import errors try: save_emails() except (errors.HttpError, FileNotFoundError) as e: logger.error(e) save_emails(update=False) emails = [f for f in (DATA_DIR / "swaptions").glob("????-??/*") if f.is_file()] swaption_stack = {} index_data = pd.DataFrame() try: with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) except FileNotFoundError: already_uploaded = {} conn = serenitas_pool.getconn() for f in emails: date_composed, msg_id = f.name.split("_") date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S") if msg_id == "16e4b563f6cff219": # GS message has IG quotes with a HY header continue if msg_id in already_uploaded: continue else: try: key, (option_stack, fwd_index) = parse_email(f, date_composed, conn) except RuntimeError as e: logger.error(e) else: if key[0] is None or len(option_stack) == 0: logger.error(f"Something wrong with email: {f.name}") continue swaption_stack[key] = pd.concat( option_stack, names=["expiry", "series", "version"] ) fwd_index["msg_id"] = int(msg_id, 16) index_data = index_data.append(fwd_index) already_uploaded[msg_id] = key[0] if index_data.empty: sys.exit() for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]: if col in index_data: index_data[col] = pd.to_numeric(index_data[col]) index_data["index"] = index_data["index"].astype("category") index_names = ["quotedate", "index", "quote_source"] swaption_stack = pd.concat(swaption_stack, names=index_names, sort=False) dup = swaption_stack.index.duplicated() if dup.any(): logger.warning("duplicated data") swaption_stack = swaption_stack[~dup] swaption_stack = swaption_stack.reset_index().set_index( ["quotedate", "index", "series", "version", "expiry", "quote_source"] ) swaption_stack = swaption_stack.sort_index() index_data = index_data.reset_index() index_data = index_data.drop_duplicates( ["quotedate", "index", "series", "version", "expiry", "quote_source"] ) write_todb(swaption_stack, index_data, conn) serenitas_pool.putconn(conn) with open(".pickle", "wb") as fh: pickle.dump(already_uploaded, fh)