diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/quote_parsing/__init__.py | 2 | ||||
| -rw-r--r-- | python/quote_parsing/__main__.py | 66 | ||||
| -rw-r--r-- | python/quote_parsing/download_emails.py (renamed from python/download_emails.py) | 14 | ||||
| -rw-r--r-- | python/quote_parsing/parse_emails.py (renamed from python/parse_emails.py) | 81 |
4 files changed, 79 insertions, 84 deletions
diff --git a/python/quote_parsing/__init__.py b/python/quote_parsing/__init__.py new file mode 100644 index 00000000..d34f83d6 --- /dev/null +++ b/python/quote_parsing/__init__.py @@ -0,0 +1,2 @@ +from ..utils import SerenitasFileHandler +logger = logging.getLogger(__name__) diff --git a/python/quote_parsing/__main__.py b/python/quote_parsing/__main__.py new file mode 100644 index 00000000..ae75cd92 --- /dev/null +++ b/python/quote_parsing/__main__.py @@ -0,0 +1,66 @@ +import logging +from ..utils import SerenitasFileHandler +from . import logger +from .download_emails import save_emails +fh = SerenitasFileHandler("emails_parsing.log") +logger.setHandler(fh) +logger.setLevel(logging.WARNING) + +try: + save_emails() +except (errors.HttpError, FileNotFoundError) as e: + logger.error(e) + save_emails(update=False) + +data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") +emails = [f for f in os.scandir(data_dir) if f.is_file()] +swaption_stack = {} +index_data = pd.DataFrame() + +try: + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) +except FileNotFoundError: + already_uploaded = {} + +for f in emails: + date_composed, msg_id = f.name.split("_") + date_composed = datetime.datetime.strptime(date_composed, + "%Y-%m-%d %H-%M-%S") + if msg_id in already_uploaded: + continue + else: + try: + key, (option_stack, fwd_index) = parse_email(f, date_composed) + except RuntimeError as e: + logger.error(e) + else: + if key[0] is None or len(option_stack) == 0: + logger.error(f"Something wrong with email: {f.name}") + continue + swaption_stack[key] = pd.concat(option_stack, + names=['expiry', 'strike']) + index_data = index_data.append(fwd_index) + already_uploaded[msg_id] = key[0] +if index_data.empty: + sys.exit() +for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: + if col in index_data: + index_data[col] = index_data[col].astype('float') +index_data['index'] = index_data['index'].astype('category') + +swaption_stack = pd.concat(swaption_stack, + names=['quotedate', 'index', 'series']) +swaption_stack = swaption_stack.reset_index() +swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', + 'expiry', 'strike']) +swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) +index_data = index_data.reset_index() +index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) +from ..utils.db import serenitas_pool +conn = serenitas_pool.getconn() +write_todb(swaption_stack, index_data, conn) +serenitas_pool.putconn(conn) + +with open(".pickle", "wb") as fh: + pickle.dump(already_uploaded, fh) diff --git a/python/download_emails.py b/python/quote_parsing/download_emails.py index cf738e9e..24d87601 100644 --- a/python/download_emails.py +++ b/python/quote_parsing/download_emails.py @@ -1,10 +1,10 @@ import base64 import json -import logging import os import sys import unicodedata +from . import logger from apiclient import errors from bs4 import BeautifulSoup, NavigableString, Tag from pathlib import Path @@ -12,7 +12,6 @@ from pytz import timezone from gmail_helpers import GmailMessage from email.utils import parsedate_to_datetime - def print_citi_html(email): soup = BeautifulSoup(email.get_content(), features="lxml") p = soup.find('p') @@ -49,7 +48,7 @@ def save_emails(update=True): continue try: message = gm.from_id(msg['id']) - logging.info(message.history_id) + logger.info(message.history_id) subject = message['subject'] date = parsedate_to_datetime(message['date']) if date.tzinfo is None: @@ -61,7 +60,7 @@ def save_emails(update=True): else: content = body.get_content() except (KeyError, UnicodeDecodeError, AttributeError) as e: - logging.error("error decoding " + msg['id']) + logger.error("error decoding " + msg['id']) continue else: email = (DATA_DIR / "swaptions" / @@ -74,10 +73,3 @@ def save_emails(update=True): (DATA_DIR / ".lastHistoryId").write_text(message.history_id) except UnboundLocalError: pass - -if __name__ == '__main__': - try: - save_emails() - except errors.HttpError as e: - logging.error(e) - save_emails(update=False) diff --git a/python/parse_emails.py b/python/quote_parsing/parse_emails.py index 9baac888..2b1e2cc1 100644 --- a/python/parse_emails.py +++ b/python/quote_parsing/parse_emails.py @@ -1,20 +1,13 @@ import pandas as pd import re -import os import psycopg2.sql as sql from download_emails import save_emails, errors import datetime -import logging import pickle import sys from quantlib.time.imm import next_date from quantlib.time.api import Date, pydate_from_qldate -logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"), - 'emails_parsing.log'), - level=logging.WARNING, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - def list_imm_dates(date): d = Date.from_datetime(date) @@ -54,8 +47,8 @@ def makedf(r, indextype, quote_source): try: df[k] = pd.to_numeric(df[k]) except ValueError as e: - logging.info(e) - logging.error("couldn't convert column") + logger.info(e) + logger.error("couldn't convert column") df[k] = pd.to_numeric(df[k].str.replace("n", "")) breakpoint() df.set_index('strike', inplace=True) @@ -313,7 +306,7 @@ def parse_ms(fh, indextype, *args): expiry = pd.to_datetime(expiry, format="%d-%b-%Y") block = parse_ms_block(fh, indextype) if block is None or block.empty: - logging.warning("MS: block is empty for {expiry} expiry") + logger.warning("MS: block is empty for {expiry} expiry") else: option_stack[expiry] = block return option_stack @@ -368,7 +361,7 @@ def parse_gs(fh, indextype, series, quotedate, ref): fwd_index.append(d.copy()) option_stack[expiry], line = parse_gs_block(fh, indextype) else: - logging.error("Can't parse expiry line:", line) + logger.error("Can't parse expiry line:", line) elif line.startswith("Assumes"): break else: @@ -401,7 +394,7 @@ def parse_citi(fh, indextype, series, quotedate): fwd_index.append(d.copy()) option_stack[expiry] = parse_citi_block(fh, indextype) else: - logging.error("Cant't parse expiry line:", line) + logger.error("Cant't parse expiry line:", line) fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') fwd_index['quote_source'] = 'CITI' @@ -431,7 +424,7 @@ def parse_email(email, date_received): try: quotedate = parse_quotedate(fh, date_received) except RuntimeError: - logging.warning("couldn't find received date in message: " + logger.warning("couldn't find received date in message: " f"{email.name}, using {date_received}") quotedate = pd.Timestamp(date_received).tz_localize("America/New_York") fh.seek(cur_pos) @@ -478,8 +471,8 @@ def write_todb(swaption_stack, index_data, conn): try: df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),] except KeyError as e: - logging.warning("missing key in swaption_stack: " - f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") + logger.warning("missing key in swaption_stack: " + f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") continue except IndexingError: breakpoint() @@ -510,61 +503,3 @@ def pickle_drop_date(date): newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} with open(".pickle", "wb") as fh: pickle.dump(newdict, fh) - - -if __name__ == "__main__": - try: - save_emails() - except (errors.HttpError, FileNotFoundError) as e: - logging.error(e) - save_emails(update=False) - data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") - emails = [f for f in os.scandir(data_dir) if f.is_file()] - swaption_stack = {} - index_data = pd.DataFrame() - try: - with open(".pickle", "rb") as fh: - already_uploaded = pickle.load(fh) - except FileNotFoundError: - already_uploaded = {} - for f in emails: - date_composed, msg_id = f.name.split("_") - date_composed = datetime.datetime.strptime(date_composed, - "%Y-%m-%d %H-%M-%S") - if msg_id in already_uploaded: - continue - else: - try: - key, (option_stack, fwd_index) = parse_email(f, date_composed) - except RuntimeError as e: - logging.error(e) - else: - if key[0] is None or len(option_stack) == 0: - logging.error(f"Something wrong with email: {f.name}") - continue - swaption_stack[key] = pd.concat(option_stack, - names=['expiry', 'strike']) - index_data = index_data.append(fwd_index) - already_uploaded[msg_id] = key[0] - if index_data.empty: - sys.exit() - for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: - if col in index_data: - index_data[col] = index_data[col].astype('float') - index_data['index'] = index_data['index'].astype('category') - - swaption_stack = pd.concat(swaption_stack, - names=['quotedate', 'index', 'series']) - swaption_stack = swaption_stack.reset_index() - swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', - 'expiry', 'strike']) - swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) - index_data = index_data.reset_index() - index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) - from db import serenitas_pool - conn = serenitas_pool.getconn() - write_todb(swaption_stack, index_data, conn) - serenitas_pool.putconn(conn) - - with open(".pickle", "wb") as fh: - pickle.dump(already_uploaded, fh) |
