import pandas as pd import re from pathlib import Path import pdb from download_emails import update_emails import datetime import sys import logging logging.basicConfig(filename='/home/share/CorpCDOs/logs/emails_parsing.log', level=logging.WARNING, format='%(asctime)s %(message)s') def makedf(r, indextype, quote_source): if indextype=='IG': cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', 'pay_offer', 'delta_pay', 'vol'] else: cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', 'pay_offer', 'delta_pay', 'vol', 'price_vol'] if quote_source == "BAML": cols.append('gamma') df = pd.DataFrame.from_records(r, columns = cols) for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma']: if col in df: df[col] = df[col].str.strip("%").astype('float')/100 for k in df: if df.dtypes[k] == 'object': try: df[k] = pd.to_numeric(df[k]) except ValueError: pdb.set_trace() df['quote_source'] = quote_source df.set_index('strike', inplace=True) return df def parse_quotedate(fh, date_received): for line in fh: line = line.rstrip() if line.startswith("At"): for p in ['%m/%d %H:%M:%S', '%b %d %Y %H:%M:%S']: try: quotedate = pd.to_datetime(line, format=p, exact=False) except ValueError: continue else: if quotedate.year == 1900: quotedate = quotedate.replace(year=date_received.year) break else: raise RuntimeError("can't parse date") return quotedate def parse_refline(line): regex = "Ref:(?P\S+)\s+(?:Fwd Px:(?P\S+)\s+)?" \ "Fwd(?: Spd)?:(?P\S+)\s+Fwd Bpv:(?P\S+)" \ "\s+Expiry:(?P\S+)" m = re.match(regex, line) try: d = m.groupdict() d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y') except AttributeError: logging.error("something wrong with " + fh.name) return d def parse_baml(fh, indextype, series, quotedate): option_stack = {} fwd_index = [] line = "" while True: if line == "": try: line = next(fh) except StopIteration: break if line.startswith("Ref"): d = parse_refline(line) d.update({'quotedate': quotedate, 'index': indextype, 'series': series}) df, line = parse_baml_block(fh, indextype) option_stack[d['expiry']] = df fwd_index.append(d) else: line = "" if option_stack: fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') return option_stack, fwd_index else: raise RuntimeError("empty email: " + fh.name) def parse_baml_block(fh, indextype): next(fh) ## skip header r = [] line = "" for line in fh: line = line.strip() if line.startswith("Ref") or line == "": break line = re.sub("[/|]", " ", line) vals = re.sub(" +", " ", line).rstrip().split(" ") if len(vals) < 3: ## something went wrong line = "" break r.append(vals) return makedf(r, indextype, "BAML"), line def parse_ms_block(fh, indextype): next(fh) ## skip header r = [] for line in fh: line = line.rstrip() if line == "": break strike, payer, receiver, vol = line.split("|") strike = strike.strip() if indextype == "HY": strike = strike.split()[0] pay_bid, pay_offer, pay_delta = payer.strip().split() rec_bid, rec_offer, rec_delta = receiver.strip().split() vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta] vol = vol.strip() if indextype == "HY": vol, price_vol = vol.replace("[","").replace("]","").split() vals += [vol, price_vol] else: vals += [vol] r.append(vals) return makedf(r, indextype, "MS") def parse_ms(fh, indextype): option_stack = {} for line in fh: line = line.rstrip() if "EXPIRY" in line: expiry = line.split(" ")[1] expiry = pd.to_datetime(expiry, format="%d-%b-%Y") option_stack[expiry] = parse_ms_block(fh, indextype) return option_stack subject_BAML = re.compile("(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") subject_MS = re.compile("\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^\d]*([\d.]+)") def parse_email(email_path): with email_path.open("rt") as fh: date_received = datetime.datetime.fromtimestamp(int(fh.readline())/1000) subject = next(fh) m = subject_BAML.match(subject) if m: indextype, series = m.groups() series = int(series) quotedate = parse_quotedate(fh, date_received) return (quotedate, indextype, series), parse_baml(fh, indextype, series, quotedate) m = subject_MS.match(subject) if m: indextype, series, ref = m.groups() series = int(series) ref = float(ref) quotedate = parse_quotedate(fh, date_received) option_stack = parse_ms(fh, indextype) fwd_index = pd.DataFrame({'quotedate': quotedate, 'ref': ref, 'index': indextype, 'series': series, 'expiry': list(option_stack.keys())}) fwd_index.set_index('quotedate', inplace = True) return (quotedate, indextype, series), (option_stack, fwd_index) raise RuntimeError("can't parse subject line: {0} for email {1}".format( subject, email_path.name)) def write_todb(swaption_stack, index_data): from sqlalchemy import MetaData, Table from db import dbengine, nan_to_null import psycopg2 serenitasdb = dbengine('serenitasdb') psycopg2.extensions.register_adapter(float, nan_to_null) meta = MetaData(bind=serenitasdb) swaption_quotes = Table('swaption_quotes', meta, autoload=True) ins = swaption_quotes.insert().values(swaption_stack.to_dict(orient='records')).execute() index_data.to_sql('swaption_ref_quotes', serenitasdb, if_exists='append', index=False) if __name__=="__main__": import pickle update_emails() emails = [f for f in Path("../../data/swaptions").iterdir() if f.is_file()] swaption_stack = {} index_data = pd.DataFrame() try: with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) except FileNotFoundError: already_uploaded = set() for f in emails: if f.name in already_uploaded: continue else: try: key, (option_stack, fwd_index) = parse_email(f) except RuntimeError as e: logging.error(e) else: swaption_stack[key] = pd.concat(option_stack, names=['expiry', 'strike']) index_data = index_data.append(fwd_index) already_uploaded.add(f.name) if index_data.empty: sys.exit() for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: index_data[col] = index_data[col].astype('float') index_data['index'] = index_data['index'].astype('category') swaption_stack = pd.concat(swaption_stack, names=['quotedate', 'index', 'series']) # import feather # feather.write_dataframe(swaption_stack, '../../data/swaptions.fth') # feather.write_dataframe(index_data, '../../data/index_data.fth') swaption_stack = swaption_stack.reset_index() swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike']) index_data = index_data.reset_index() index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) write_todb(swaption_stack, index_data) with open(".pickle", "wb") as fh: pickle.dump(already_uploaded, fh)