diff options
Diffstat (limited to 'python/quote_parsing')
| -rw-r--r-- | python/quote_parsing/__init__.py | 2 | ||||
| -rw-r--r-- | python/quote_parsing/__main__.py | 66 | ||||
| -rw-r--r-- | python/quote_parsing/download_emails.py | 75 | ||||
| -rw-r--r-- | python/quote_parsing/parse_emails.py | 505 |
4 files changed, 648 insertions, 0 deletions
diff --git a/python/quote_parsing/__init__.py b/python/quote_parsing/__init__.py new file mode 100644 index 00000000..d34f83d6 --- /dev/null +++ b/python/quote_parsing/__init__.py @@ -0,0 +1,2 @@ +from ..utils import SerenitasFileHandler +logger = logging.getLogger(__name__) diff --git a/python/quote_parsing/__main__.py b/python/quote_parsing/__main__.py new file mode 100644 index 00000000..ae75cd92 --- /dev/null +++ b/python/quote_parsing/__main__.py @@ -0,0 +1,66 @@ +import logging +from ..utils import SerenitasFileHandler +from . import logger +from .download_emails import save_emails +fh = SerenitasFileHandler("emails_parsing.log") +logger.setHandler(fh) +logger.setLevel(logging.WARNING) + +try: + save_emails() +except (errors.HttpError, FileNotFoundError) as e: + logger.error(e) + save_emails(update=False) + +data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") +emails = [f for f in os.scandir(data_dir) if f.is_file()] +swaption_stack = {} +index_data = pd.DataFrame() + +try: + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) +except FileNotFoundError: + already_uploaded = {} + +for f in emails: + date_composed, msg_id = f.name.split("_") + date_composed = datetime.datetime.strptime(date_composed, + "%Y-%m-%d %H-%M-%S") + if msg_id in already_uploaded: + continue + else: + try: + key, (option_stack, fwd_index) = parse_email(f, date_composed) + except RuntimeError as e: + logger.error(e) + else: + if key[0] is None or len(option_stack) == 0: + logger.error(f"Something wrong with email: {f.name}") + continue + swaption_stack[key] = pd.concat(option_stack, + names=['expiry', 'strike']) + index_data = index_data.append(fwd_index) + already_uploaded[msg_id] = key[0] +if index_data.empty: + sys.exit() +for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: + if col in index_data: + index_data[col] = index_data[col].astype('float') +index_data['index'] = index_data['index'].astype('category') + +swaption_stack = pd.concat(swaption_stack, + names=['quotedate', 'index', 'series']) +swaption_stack = swaption_stack.reset_index() +swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', + 'expiry', 'strike']) +swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) +index_data = index_data.reset_index() +index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) +from ..utils.db import serenitas_pool +conn = serenitas_pool.getconn() +write_todb(swaption_stack, index_data, conn) +serenitas_pool.putconn(conn) + +with open(".pickle", "wb") as fh: + pickle.dump(already_uploaded, fh) diff --git a/python/quote_parsing/download_emails.py b/python/quote_parsing/download_emails.py new file mode 100644 index 00000000..24d87601 --- /dev/null +++ b/python/quote_parsing/download_emails.py @@ -0,0 +1,75 @@ +import base64 +import json +import os +import sys +import unicodedata + +from . import logger +from apiclient import errors +from bs4 import BeautifulSoup, NavigableString, Tag +from pathlib import Path +from pytz import timezone +from gmail_helpers import GmailMessage +from email.utils import parsedate_to_datetime + +def print_citi_html(email): + soup = BeautifulSoup(email.get_content(), features="lxml") + p = soup.find('p') + s = p.next + if isinstance(s, NavigableString): + l = [unicodedata.normalize("NFKD", s)] + else: + raise ValueError("weird email") + for br in p.findAll('br'): + s = br.next + if isinstance(s, NavigableString): + l.append(unicodedata.normalize("NFKD", s)) + elif isinstance(s, Tag) and s.name == 'br': + l.append('\n') + else: + raise ValueError("weird email") + return "\n".join(l) + +def save_emails(update=True): + """Download new emails that were labeled swaptions.""" + DATA_DIR = Path(os.getenv("DATA_DIR")) + + if update: + last_history_id = int((DATA_DIR / ".lastHistoryId").read_text()) + existing_msgs = [] + else: + p = DATA_DIR / "swaptions" + existing_msgs = set(str(x).split("_")[1] for x in p.iterdir() if x.is_file()) + last_history_id = None + + gm = GmailMessage() + for msg in gm.list_msg_ids('swaptions', last_history_id): + if msg['id'] in existing_msgs: + continue + try: + message = gm.from_id(msg['id']) + logger.info(message.history_id) + subject = message['subject'] + date = parsedate_to_datetime(message['date']) + if date.tzinfo is None: + date = date.replace(tzinfo=timezone('utc')) + date = date.astimezone(timezone('America/New_York')) + body = message.get_body('plain') + if body is None: + content = print_citi_html(message.get_body('html')) + else: + content = body.get_content() + except (KeyError, UnicodeDecodeError, AttributeError) as e: + logger.error("error decoding " + msg['id']) + continue + else: + email = (DATA_DIR / "swaptions" / + f"{date:%Y-%m-%d %H-%M-%S}_{msg['id']}") + with email.open("w") as fh: + fh.write(subject + "\r\n") + fh.write(content) + try: + new_history_id = message.history_id + (DATA_DIR / ".lastHistoryId").write_text(message.history_id) + except UnboundLocalError: + pass diff --git a/python/quote_parsing/parse_emails.py b/python/quote_parsing/parse_emails.py new file mode 100644 index 00000000..2b1e2cc1 --- /dev/null +++ b/python/quote_parsing/parse_emails.py @@ -0,0 +1,505 @@ +import pandas as pd +import re +import psycopg2.sql as sql +from download_emails import save_emails, errors +import datetime +import pickle +import sys +from quantlib.time.imm import next_date +from quantlib.time.api import Date, pydate_from_qldate + + +def list_imm_dates(date): + d = Date.from_datetime(date) + r = [] + for i in range(10): + d = next_date(d, False) + r.append(pydate_from_qldate(d)) + return r + + +def makedf(r, indextype, quote_source): + if indextype == 'IG': + cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', + 'pay_offer', 'delta_pay', 'vol'] + else: + cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', + 'pay_offer', 'delta_pay', 'vol', 'price_vol'] + if quote_source == "BAML": + cols.append('gamma') + if quote_source == "GS": + cols.append("tail") + df = pd.DataFrame.from_records(r, columns=cols) + for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']: + if col in df: + try: + df[col] = df[col].str.rstrip("%").astype('float') / 100 + except ValueError: #typo in one email + df[col] = (df[col].str.rstrip("%"). + str.replace("n", "").astype("float") / 100) + if quote_source == "GS": + for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: + df[col] = df[col].str.strip('-') + df['delta_pay'] *= -1 + for k in df: + if df.dtypes[k] == 'object': + df[k] = df[k].str.replace(",", "") + try: + df[k] = pd.to_numeric(df[k]) + except ValueError as e: + logger.info(e) + logger.error("couldn't convert column") + df[k] = pd.to_numeric(df[k].str.replace("n", "")) + breakpoint() + df.set_index('strike', inplace=True) + return df + + +def parse_quotedate(fh, date_received): + for line in fh: + line = line.rstrip() + if "At:" in line or "Sent:" in line: + for p in ["%m/%d/%y %H:%M:%S", "%b %d %Y %H:%M:%S", "%m/%d %H:%M:%S", + "%B %d, %Y %I:%M %p"]: + try: + quotedate = pd.to_datetime(line, format=p, exact=False) + except ValueError: + continue + else: + if quotedate.year == 1900: # p='%m/%d %H:%M:%S' + quotedate = quotedate.replace(year=date_received.year) + quotedate = quotedate.tz_localize("America/New_York") + break + else: + raise RuntimeError("can't parse date from {line}") + return quotedate + else: + raise RuntimeError("no date received in the email") + + +def parse_refline(line): + regex = r"Ref:(?P<ref>\S+)\s+(?:Fwd Px:(?P<fwdprice>\S+)\s+)?" \ + r"Fwd(?: Spd)?:(?P<fwdspread>\S+)\s+Fwd Bpv:(?P<fwdbpv>\S+)" \ + r"\s+Expiry:(?P<expiry>\S+)" + m = re.match(regex, line) + try: + d = m.groupdict() + d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y') + except AttributeError: + raise RuntimeError(f"can't parse refline {line}") + return d + + +def parse_baml(fh, indextype, series, quotedate, *args): + option_stack = {} + fwd_index = [] + line = "" + while True: + if line == "": + try: + line = next(fh) + except StopIteration: + break + if line.startswith("Ref"): + d = parse_refline(line) + d.update({'quotedate': quotedate, 'index': indextype, 'series': series}) + df, line = parse_baml_block(fh, indextype) + option_stack[d['expiry']] = df + fwd_index.append(d) + else: + line = "" + if option_stack: + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'BAML' + return option_stack, fwd_index + else: + raise RuntimeError("empty email: " + fh.name) + + +def parse_baml_block(fh, indextype): + next(fh) # skip header + r = [] + line = "" + for line in fh: + line = line.strip() + if line.startswith("Ref") or line == "": + break + line = re.sub("[/|]", " ", line) + vals = re.sub(" +", " ", line).rstrip().split(" ") + if len(vals) < 3: # something went wrong + line = "" + break + r.append(vals) + return makedf(r, indextype, "BAML"), line + + +def parse_ms_block(fh, indextype): + line = next(fh) # skip header + if line.strip() == "": # empty block + return None + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + strike, payer, receiver, vol = line.split("|") + strike = strike.strip() + if indextype == "HY": + strike = strike.split()[0] + try: + pay_bid, pay_offer, pay_delta = payer.strip().split() + rec_bid, rec_offer, rec_delta = receiver.strip().split() + except ValueError: + try: + pay_mid, pay_delta = payer.strip().split() + rec_mid, rec_delta = receiver.strip().split() + pay_bid, pay_offer = pay_mid, pay_mid + rec_bid, rec_offer = rec_mid, rec_mid + except ValueError: + raise RuntimeError("Couldn't parse line: {line}") + + vals = [strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta] + vol = vol.strip() + if indextype == "HY": + try: + price_vol, vol = vol.replace("[", "").replace("]", "").split() + except ValueError: + price_vol, vol, vol_change, be = (vol.replace("[", ""). + replace("]", "").split()) + vals += [vol, price_vol] + else: + if " " in vol: + vol, vol_change, be = vol.split() + vals += [vol] + r.append(vals) + return makedf(r, indextype, "MS") + + +def parse_nomura_block(fh, indextype): + next(fh) # skip header + r = [] + for line in fh: + line = line.rstrip() + if "EXPIRY" in line or line == "": + break + strike, receiver, payer, vol, _ = line.split("|", 4) + strike = strike.strip() + pay, pay_delta = payer.strip().split() + rec, rec_delta = receiver.strip().split() + pay_bid, pay_offer = pay.split("/") + rec_bid, rec_offer = rec.split("/") + vol = vol.strip() + vals = [strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol] + if indextype == "HY": # we don't have price vol + vals.append(None) + r.append(vals) + else: + return None, makedf(r, indextype, "NOM") + return line, makedf(r, indextype, "NOM") + + +def parse_sg_block(fh, indextype, expiration_dates): + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + if indextype == "IG": + option_type, strike, price, delta, vol, expiry = line.split() + else: + option_type, strike, strike_spread, price, delta, vol, expiry = line.split() + + expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month + expiry = next(pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month) + if option_type == "Rec": + rec_bid, rec_offer = price.split("/") + pay_bid, pay_offer = None, None + rec_delta, pay_delta = delta, None + else: + pay_bid, pay_offer = price.split("/") + rec_bid, rec_offer = None, None + rec_delta, pay_delta = None, delta + vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, + pay_offer, pay_delta, vol] + if indextype == "HY": + vals.append(None) + r.append(vals) + return expiry, makedf(r, indextype, "SG") + + +def parse_gs_block(fh, indextype): + #skip header + while True: + line = next(fh) + if line.strip().startswith("Stk"): + break + + r = [] + for line in fh: + line = line.rstrip() + if line == "": + continue + if line.startswith("Expiry") or line.startswith("Assumes"): + break + vals = line.split() + if indextype == 'HY': + vals.pop(2) + vals.pop(9) + else: + vals.pop(1) + vals.pop(8) + strike = vals.pop(0) + if indextype == "HY": + vals.pop(0) # pop the spread + pay, pay_delta = vals[:2] + pay_bid, pay_offer = pay.split("/") + rec_bid, rec_offer = vals[2].split("/") + vol = vals[3] + tail = vals[6] + vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol] + if indextype == "HY": + vals.append(None) + vals.append(tail) + r.append(vals) + return makedf(r, indextype, "GS"), line + +def parse_citi_block(fh, indextype): + next(fh) #skip header + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + if indextype == "HY": + strike, payers, receivers, vol, price_vol = line.split("|") + else: + strike, payers, receivers, vol = line.split("|") + strike = strike.strip() + pay_bid, pay_offer = payers.split("/") + pay_bid = pay_bid.strip() + pay_offer = pay_offer.strip() + pay_offer, pay_delta = pay_offer.split() + rec_bid, rec_offer = receivers.split("/") + rec_bid = rec_bid.strip() + rec_offer = rec_offer.strip() + rec_offer, rec_delta = rec_offer.split() + vol = vol.strip() + vol = vol.split()[0] + if indextype == "HY": + price_vol = price_vol.strip() + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol, price_vol]) + else: + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol]) + return makedf(r, indextype, "CITI") + +def parse_ms(fh, indextype, *args): + option_stack = {} + for line in fh: + line = line.rstrip() + if "EXPIRY" in line: + expiry = line.split(" ")[1] + expiry = pd.to_datetime(expiry, format="%d-%b-%Y") + block = parse_ms_block(fh, indextype) + if block is None or block.empty: + logger.warning("MS: block is empty for {expiry} expiry") + else: + option_stack[expiry] = block + return option_stack + + +def parse_nom(fh, indextype, *args): + option_stack = {} + + def aux(line, fh, indextype, option_stack): + expiry = line.split(" ")[0] + expiry = pd.to_datetime(expiry, format="%d-%b-%y") + next_line, df = parse_nomura_block(fh, indextype) + option_stack[expiry] = df + if next_line: + if "EXPIRY" in next_line: + aux(next_line, fh, indextype, option_stack) + else: + raise RuntimeError(f"Don't know what to do with {line}.") + for line in fh: + line = line.rstrip() + if "EXPIRY" in line: + aux(line, fh, indextype, option_stack) + return option_stack + + +def parse_sg(fh, indextype, expiration_dates): + option_stack = {} + for line in fh: + line = line.rstrip() + if line.startswith("Type"): + expiry, df = parse_sg_block(fh, indextype, expiration_dates) + option_stack[expiry] = df + return option_stack + + +def parse_gs(fh, indextype, series, quotedate, ref): + option_stack = {} + fwd_index = [] + d = {'quotedate': quotedate, 'index': indextype, + 'series': series, 'ref': ref} + pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") + + line = next(fh).strip() + while True: + if line.startswith("Expiry"): + m = pat.match(line) + if m: + expiry, fwdprice, fwdspread = m.groups() + expiry = pd.to_datetime(expiry, format='%d%b%y') + d.update({'fwdspread': fwdspread, 'fwdprice': fwdprice, + 'expiry': expiry}) + fwd_index.append(d.copy()) + option_stack[expiry], line = parse_gs_block(fh, indextype) + else: + logger.error("Can't parse expiry line:", line) + elif line.startswith("Assumes"): + break + else: + try: + line = next(fh).strip() + except StopIteration: + break + + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'GS' + return option_stack, fwd_index + +def parse_citi(fh, indextype, series, quotedate): + option_stack = {} + fwd_index = [] + d = {'quotedate': quotedate, + 'index': indextype, + 'series': series} + pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") + for line in fh: + line = line.strip() + if line.startswith("Exp"): + m = pat.match(line) + if m: + expiry, ref = m.groups() + expiry = pd.to_datetime(expiry, format='%d-%b-%y') + d.update({'ref': ref, + 'expiry': expiry}) + fwd_index.append(d.copy()) + option_stack[expiry] = parse_citi_block(fh, indextype) + else: + logger.error("Cant't parse expiry line:", line) + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'CITI' + return option_stack, fwd_index + +subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") +subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)") +subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") +subject_gs = re.compile(r"(?:FW: |Fwd: )?GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") +subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") +subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y") + +def parse_email(email, date_received): + with open(email.path, "rt") as fh: + subject = fh.readline().lstrip() + + for source in ['BAML', 'GS', 'MS', 'NOM', 'SG', 'CITI']: + m = globals()[f'subject_{source.lower()}'].match(subject) + if m: + if source in ['BAML', 'CITI']: + indextype, series = m.groups() + else: + indextype, series, ref = m.groups() + ref = float(ref) + series = int(series) + cur_pos = fh.tell() + try: + quotedate = parse_quotedate(fh, date_received) + except RuntimeError: + logger.warning("couldn't find received date in message: " + f"{email.name}, using {date_received}") + quotedate = pd.Timestamp(date_received).tz_localize("America/New_York") + fh.seek(cur_pos) + + expiration_dates = list_imm_dates(quotedate) + parse_fun = globals()[f'parse_{source.lower()}'] + if source in ['BAML', 'CITI']: + return (quotedate, indextype, series), \ + parse_fun(fh, indextype, series, quotedate) + elif source == "GS": + return (quotedate, indextype, series), \ + parse_fun(fh, indextype, series, quotedate, ref) + else: + option_stack = parse_fun(fh, indextype, expiration_dates) + fwd_index = pd.DataFrame({'quotedate': quotedate, + 'ref': ref, + 'index': indextype, + 'series': series, + 'expiry': list(option_stack.keys()), + 'quote_source': source}) + fwd_index.set_index('quotedate', inplace=True) + return (quotedate, indextype, series), (option_stack, fwd_index) + else: + raise RuntimeError(f"can't parse subject line: {subject} for email {email.name}") + +def write_todb(swaption_stack, index_data, conn): + def gen_sql_str(query, table_name, columns): + return query.format(sql.Identifier(table_name), + sql.SQL(", ").join(sql.Identifier(c) for c in columns), + sql.SQL(", ").join(sql.Placeholder() * len(columns))) + query = sql.SQL("INSERT INTO {}({}) VALUES({}) " + "ON CONFLICT DO NOTHING RETURNING ref_id") + sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns) + query = sql.SQL("INSERT INTO {}({}) VALUES({}) " + "ON CONFLICT DO NOTHING") + with conn.cursor() as c: + for t in index_data.itertuples(index=False): + c.execute(sql_str, t) + try: + ref_id, = next(c) + except StopIteration: + continue + else: + try: + df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),] + except KeyError as e: + logger.warning("missing key in swaption_stack: " + f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") + continue + except IndexingError: + breakpoint() + df['ref_id'] = ref_id + c.executemany(gen_sql_str(query, "swaption_quotes", df.columns), + df.itertuples(index=False)) + conn.commit() + + +def get_email_list(date): + """returns a list of email file names for a given date + + Parameters + ---------- + date : string + """ + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) + df = pd.DataFrame.from_dict(already_uploaded, orient='index') + df.columns = ['quotedate'] + df = df.reset_index().set_index('quotedate') + return df.loc[date, 'index'].tolist() + + +def pickle_drop_date(date): + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) + newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} + with open(".pickle", "wb") as fh: + pickle.dump(newdict, fh) |
