import pandas as pd import re import os import pdb from db import dbconn import psycopg2.sql as sql from download_emails import save_emails import datetime import logging import pickle import sys from quantlib.time.imm import next_date from quantlib.time.api import Date, pydate_from_qldate logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"), 'emails_parsing.log'), level=logging.WARNING, format='%(asctime)s %(message)s') def list_imm_dates(date): d = Date.from_datetime(date) r = [] for i in range(10): d = next_date(d, False) r.append(pydate_from_qldate(d)) return r def makedf(r, indextype, quote_source): if indextype=='IG': cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', 'pay_offer', 'delta_pay', 'vol'] else: cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', 'pay_offer', 'delta_pay', 'vol', 'price_vol'] if quote_source == "BAML": cols.append('gamma') if quote_source == "GS": cols.append("tail") df = pd.DataFrame.from_records(r, columns = cols) for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']: if col in df: df[col] = df[col].str.strip("%").astype('float')/100 if quote_source == "GS": for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: df[col] = df[col].str.strip('-') df['delta_pay'] *= -1 for k in df: if df.dtypes[k] == 'object': try: df[k] = pd.to_numeric(df[k]) except ValueError: pdb.set_trace() df['quote_source'] = quote_source df.set_index('strike', inplace=True) return df def parse_quotedate(fh, date_received): for line in fh: line = line.rstrip() if "At:" in line: for p in ['%m/%d/%y %H:%M:%S', '%b %d %Y %H:%M:%S', '%m/%d %H:%M:%S']: try: quotedate = pd.to_datetime(line, format=p, exact=False) except ValueError: continue else: if quotedate.year == 1900: # p='%m/%d %H:%M:%S' quotedate = quotedate.replace(year=date_received.year) break else: raise RuntimeError("can't parse date") return quotedate def parse_refline(line): regex = "Ref:(?P\S+)\s+(?:Fwd Px:(?P\S+)\s+)?" \ "Fwd(?: Spd)?:(?P\S+)\s+Fwd Bpv:(?P\S+)" \ "\s+Expiry:(?P\S+)" m = re.match(regex, line) try: d = m.groupdict() d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y') except AttributeError: logging.error("something wrong with " + fh.name) return d def parse_baml(fh, indextype, series, quotedate, *args): option_stack = {} fwd_index = [] line = "" while True: if line == "": try: line = next(fh) except StopIteration: break if line.startswith("Ref"): d = parse_refline(line) d.update({'quotedate': quotedate, 'index': indextype, 'series': series}) df, line = parse_baml_block(fh, indextype) option_stack[d['expiry']] = df fwd_index.append(d) else: line = "" if option_stack: fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') return option_stack, fwd_index else: raise RuntimeError("empty email: " + fh.name) def parse_baml_block(fh, indextype): next(fh) ## skip header r = [] line = "" for line in fh: line = line.strip() if line.startswith("Ref") or line == "": break line = re.sub("[/|]", " ", line) vals = re.sub(" +", " ", line).rstrip().split(" ") if len(vals) < 3: ## something went wrong line = "" break r.append(vals) return makedf(r, indextype, "BAML"), line def parse_ms_block(fh, indextype): line = next(fh) ## skip header if line.strip() == "": ## empty block return None r = [] for line in fh: line = line.rstrip() if line == "": break strike, payer, receiver, vol = line.split("|") strike = strike.strip() if indextype == "HY": strike = strike.split()[0] pay_bid, pay_offer, pay_delta = payer.strip().split() rec_bid, rec_offer, rec_delta = receiver.strip().split() vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta] vol = vol.strip() if indextype == "HY": try: price_vol, vol = vol.replace("[","").replace("]","").split() except ValueError: price_vol, vol, vol_change, be = vol.replace("[","").replace("]","").split() vals += [vol, price_vol] else: if " " in vol: vol, vol_change, be = vol.split() vals += [vol] r.append(vals) return makedf(r, indextype, "MS") def parse_nomura_block(fh, indextype): next(fh) ## skip header r = [] for line in fh: line = line.rstrip() if "EXPIRY" in line or line == "": break strike, receiver, payer, vol, _ = line.split("|", 4) strike = strike.strip() pay, pay_delta = payer.strip().split() rec, rec_delta = receiver.strip().split() pay_bid, pay_offer = pay.split("/") rec_bid, rec_offer = rec.split("/") vol = vol.strip() vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol] if indextype == "HY": # we don't have price vol vals.append(None) r.append(vals) else: return None, makedf(r, indextype, "NOM") return line, makedf(r, indextype, "NOM") def parse_sg_block(fh, indextype, expiration_dates): r = [] for line in fh: line = line.rstrip() if line == "": break if indextype == "IG": option_type, strike, price, delta, vol, expiry = line.split() else: option_type, strike, strike_spread, price, delta, vol, expiry = line.split() expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month expiry = next(pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month) if option_type == "Rec": rec_bid, rec_offer = price.split("/") pay_bid, pay_offer = None, None rec_delta, pay_delta = delta, None else: pay_bid, pay_offer = price.split("/") rec_bid, rec_offer = None, None rec_delta, pay_delta = None, delta vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol] if indextype == "HY": vals.append(None) r.append(vals) return expiry, makedf(r, indextype, "SG") def parse_gs_block(fh, indextype): next(fh) r = [] for line in fh: line = line.rstrip() if line == "": break vals = line.split() if indextype=='HY': vals.pop(2) vals.pop(9) else: vals.pop(1) vals.pop(8) strike = vals.pop(0) if indextype == "HY": vals.pop(0) #pop the spread pay, pay_delta = vals[:2] pay_bid, pay_offer = pay.split("/") rec_bid, rec_offer = vals[2].split("/") vol = vals[3] tail = vals[6] vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol] if indextype == "HY": vals.append(None) vals.append(tail) r.append(vals) return makedf(r, indextype, "GS") def parse_ms(fh, indextype, *args): option_stack = {} for line in fh: line = line.rstrip() if "EXPIRY" in line: expiry = line.split(" ")[1] expiry = pd.to_datetime(expiry, format="%d-%b-%Y") block = parse_ms_block(fh, indextype) if block is not None: option_stack[expiry] = block return option_stack def parse_nomura(fh, indextype, *args): option_stack = {} def aux(line, fh, indextype, option_stack): expiry = line.split(" ")[0] expiry = pd.to_datetime(expiry, format="%d-%b-%y") next_line, df = parse_nomura_block(fh, indextype) option_stack[expiry] = df if next_line: if "EXPIRY" in next_line: aux(next_line, fh, indextype, option_stack) else: raise RuntimeError("Don't know what to do with {}:".format(line)) for line in fh: line = line.rstrip() if "EXPIRY" in line: aux(line, fh, indextype, option_stack) return option_stack def parse_sg(fh, indextype, expiration_dates): option_stack = {} fwd_index = [] for line in fh: line = line.rstrip() if line.startswith("Type"): expiry, df = parse_sg_block(fh, indextype, expiration_dates) option_stack[expiry] = df return option_stack def parse_gs(fh, indextype, series, quotedate, ref): option_stack = {} fwd_index = [] d = {'quotedate': quotedate, 'index': indextype, 'series': series, 'ref': ref} for line in fh: line = line.rstrip() if line.startswith("Expiry"): m = re.match("Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)", line) if m: expiry, fwdprice, fwdspread = m.groups() expiry = pd.to_datetime(expiry, format='%d%b%y') d.update({'fwdspread': fwdspread, 'fwdprice': fwdprice, 'expiry': expiry}) fwd_index.append(d.copy()) option_stack[expiry] = parse_gs_block(fh, indextype) else: logging.error("Can't parse expiry line:", line) fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') return option_stack, fwd_index subject_baml = re.compile("(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") subject_ms = re.compile("[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)") subject_nomura = re.compile("(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") subject_gs = re.compile("GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") subject_sg = re.compile("SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") def parse_email(email, date_received): with open(email.path, "rt") as fh: subject = next(fh) for source in ['BAML', 'MS', 'NOMURA', 'GS', 'SG']: m = globals()['subject_'+source.lower()].match(subject) if m: if source == 'BAML': indextype, series = m.groups() else: indextype, series, ref = m.groups() ref = float(ref) series = int(series) quotedate = parse_quotedate(fh, date_received) if quotedate is None: print(email.path) continue expiration_dates = list_imm_dates(quotedate) parse_fun = globals()['parse_'+source.lower()] if source == 'BAML': return (quotedate, indextype, series), \ parse_fun(fh, indextype, series, quotedate) elif source == "GS": return (quotedate, indextype, series), \ parse_fun(fh, indextype, series, quotedate, ref) else: option_stack = parse_fun(fh, indextype, expiration_dates) fwd_index = pd.DataFrame({'quotedate': quotedate, 'ref': ref, 'index': indextype, 'series': series, 'expiry': list(option_stack.keys())}) fwd_index.set_index('quotedate', inplace = True) return (quotedate, indextype, series), (option_stack, fwd_index) else: raise RuntimeError("can't parse subject line: {0} for email {1}".format( subject, email.name)) def write_todb(swaption_stack, index_data): conn = dbconn('serenitasdb') query = sql.SQL("INSERT INTO {}({}) VALUES({}) " \ "ON CONFLICT DO NOTHING") for df, table in zip([index_data, swaption_stack], ["swaption_ref_quotes", "swaption_quotes"]): cols = df.columns sql_str = query.format(sql.Identifier(table), sql.SQL(", ").join(sql.Identifier(c) for c in cols), sql.SQL(", ").join(sql.Placeholder() * len(cols))) with conn.cursor() as c: c.executemany(sql_str, df.itertuples(index=False)) conn.commit() conn.close() def get_email_list(date): """returns a list of email file names for a given date Parameters ---------- date : string """ with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) df = pd.DataFrame.from_dict(already_uploaded, orient='index') df.columns = ['quotedate'] df = df.reset_index().set_index('quotedate') return df.loc[date,'index'].tolist() def pickle_drop_date(date): with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} with open(".pickle", "wb") as fh: pickle.dump(newdict, fh) if __name__=="__main__": save_emails() data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") emails = [f for f in os.scandir(data_dir) if f.is_file()] swaption_stack = {} index_data = pd.DataFrame() try: with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) except FileNotFoundError: already_uploaded = {} for f in emails: date_received, msg_id = f.name.split("_") date_received = datetime.datetime.strptime(date_received, "%Y-%m-%d %H-%M-%S") if msg_id in already_uploaded: continue else: try: key, (option_stack, fwd_index) = parse_email(f, date_received) except RuntimeError as e: logging.error(e) else: if key[0] is None: logging.error("Something wrong with email: {}".format(f.name)) continue swaption_stack[key] = pd.concat(option_stack, names=['expiry', 'strike']) index_data = index_data.append(fwd_index) already_uploaded[msg_id] = key[0] if index_data.empty: sys.exit() for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: if col in index_data: index_data[col] = index_data[col].astype('float') index_data['index'] = index_data['index'].astype('category') swaption_stack = pd.concat(swaption_stack, names=['quotedate', 'index', 'series']) swaption_stack = swaption_stack.reset_index() swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike']) index_data = index_data.reset_index() index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) write_todb(swaption_stack, index_data) with open(".pickle", "wb") as fh: pickle.dump(already_uploaded, fh)