diff options
Diffstat (limited to 'python/parse_emails.py')
| -rw-r--r-- | python/parse_emails.py | 570 |
1 files changed, 0 insertions, 570 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py deleted file mode 100644 index 9baac888..00000000 --- a/python/parse_emails.py +++ /dev/null @@ -1,570 +0,0 @@ -import pandas as pd -import re -import os -import psycopg2.sql as sql -from download_emails import save_emails, errors -import datetime -import logging -import pickle -import sys -from quantlib.time.imm import next_date -from quantlib.time.api import Date, pydate_from_qldate - -logging.basicConfig(filename=os.path.join(os.getenv("LOG_DIR"), - 'emails_parsing.log'), - level=logging.WARNING, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - -def list_imm_dates(date): - d = Date.from_datetime(date) - r = [] - for i in range(10): - d = next_date(d, False) - r.append(pydate_from_qldate(d)) - return r - - -def makedf(r, indextype, quote_source): - if indextype == 'IG': - cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', - 'pay_offer', 'delta_pay', 'vol'] - else: - cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', - 'pay_offer', 'delta_pay', 'vol', 'price_vol'] - if quote_source == "BAML": - cols.append('gamma') - if quote_source == "GS": - cols.append("tail") - df = pd.DataFrame.from_records(r, columns=cols) - for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']: - if col in df: - try: - df[col] = df[col].str.rstrip("%").astype('float') / 100 - except ValueError: #typo in one email - df[col] = (df[col].str.rstrip("%"). - str.replace("n", "").astype("float") / 100) - if quote_source == "GS": - for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: - df[col] = df[col].str.strip('-') - df['delta_pay'] *= -1 - for k in df: - if df.dtypes[k] == 'object': - df[k] = df[k].str.replace(",", "") - try: - df[k] = pd.to_numeric(df[k]) - except ValueError as e: - logging.info(e) - logging.error("couldn't convert column") - df[k] = pd.to_numeric(df[k].str.replace("n", "")) - breakpoint() - df.set_index('strike', inplace=True) - return df - - -def parse_quotedate(fh, date_received): - for line in fh: - line = line.rstrip() - if "At:" in line or "Sent:" in line: - for p in ["%m/%d/%y %H:%M:%S", "%b %d %Y %H:%M:%S", "%m/%d %H:%M:%S", - "%B %d, %Y %I:%M %p"]: - try: - quotedate = pd.to_datetime(line, format=p, exact=False) - except ValueError: - continue - else: - if quotedate.year == 1900: # p='%m/%d %H:%M:%S' - quotedate = quotedate.replace(year=date_received.year) - quotedate = quotedate.tz_localize("America/New_York") - break - else: - raise RuntimeError("can't parse date from {line}") - return quotedate - else: - raise RuntimeError("no date received in the email") - - -def parse_refline(line): - regex = r"Ref:(?P<ref>\S+)\s+(?:Fwd Px:(?P<fwdprice>\S+)\s+)?" \ - r"Fwd(?: Spd)?:(?P<fwdspread>\S+)\s+Fwd Bpv:(?P<fwdbpv>\S+)" \ - r"\s+Expiry:(?P<expiry>\S+)" - m = re.match(regex, line) - try: - d = m.groupdict() - d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y') - except AttributeError: - raise RuntimeError(f"can't parse refline {line}") - return d - - -def parse_baml(fh, indextype, series, quotedate, *args): - option_stack = {} - fwd_index = [] - line = "" - while True: - if line == "": - try: - line = next(fh) - except StopIteration: - break - if line.startswith("Ref"): - d = parse_refline(line) - d.update({'quotedate': quotedate, 'index': indextype, 'series': series}) - df, line = parse_baml_block(fh, indextype) - option_stack[d['expiry']] = df - fwd_index.append(d) - else: - line = "" - if option_stack: - fwd_index = pd.DataFrame.from_records(fwd_index, - index='quotedate') - fwd_index['quote_source'] = 'BAML' - return option_stack, fwd_index - else: - raise RuntimeError("empty email: " + fh.name) - - -def parse_baml_block(fh, indextype): - next(fh) # skip header - r = [] - line = "" - for line in fh: - line = line.strip() - if line.startswith("Ref") or line == "": - break - line = re.sub("[/|]", " ", line) - vals = re.sub(" +", " ", line).rstrip().split(" ") - if len(vals) < 3: # something went wrong - line = "" - break - r.append(vals) - return makedf(r, indextype, "BAML"), line - - -def parse_ms_block(fh, indextype): - line = next(fh) # skip header - if line.strip() == "": # empty block - return None - r = [] - for line in fh: - line = line.rstrip() - if line == "": - break - strike, payer, receiver, vol = line.split("|") - strike = strike.strip() - if indextype == "HY": - strike = strike.split()[0] - try: - pay_bid, pay_offer, pay_delta = payer.strip().split() - rec_bid, rec_offer, rec_delta = receiver.strip().split() - except ValueError: - try: - pay_mid, pay_delta = payer.strip().split() - rec_mid, rec_delta = receiver.strip().split() - pay_bid, pay_offer = pay_mid, pay_mid - rec_bid, rec_offer = rec_mid, rec_mid - except ValueError: - raise RuntimeError("Couldn't parse line: {line}") - - vals = [strike, rec_bid, rec_offer, rec_delta, - pay_bid, pay_offer, pay_delta] - vol = vol.strip() - if indextype == "HY": - try: - price_vol, vol = vol.replace("[", "").replace("]", "").split() - except ValueError: - price_vol, vol, vol_change, be = (vol.replace("[", ""). - replace("]", "").split()) - vals += [vol, price_vol] - else: - if " " in vol: - vol, vol_change, be = vol.split() - vals += [vol] - r.append(vals) - return makedf(r, indextype, "MS") - - -def parse_nomura_block(fh, indextype): - next(fh) # skip header - r = [] - for line in fh: - line = line.rstrip() - if "EXPIRY" in line or line == "": - break - strike, receiver, payer, vol, _ = line.split("|", 4) - strike = strike.strip() - pay, pay_delta = payer.strip().split() - rec, rec_delta = receiver.strip().split() - pay_bid, pay_offer = pay.split("/") - rec_bid, rec_offer = rec.split("/") - vol = vol.strip() - vals = [strike, rec_bid, rec_offer, rec_delta, - pay_bid, pay_offer, pay_delta, vol] - if indextype == "HY": # we don't have price vol - vals.append(None) - r.append(vals) - else: - return None, makedf(r, indextype, "NOM") - return line, makedf(r, indextype, "NOM") - - -def parse_sg_block(fh, indextype, expiration_dates): - r = [] - for line in fh: - line = line.rstrip() - if line == "": - break - if indextype == "IG": - option_type, strike, price, delta, vol, expiry = line.split() - else: - option_type, strike, strike_spread, price, delta, vol, expiry = line.split() - - expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month - expiry = next(pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month) - if option_type == "Rec": - rec_bid, rec_offer = price.split("/") - pay_bid, pay_offer = None, None - rec_delta, pay_delta = delta, None - else: - pay_bid, pay_offer = price.split("/") - rec_bid, rec_offer = None, None - rec_delta, pay_delta = None, delta - vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, - pay_offer, pay_delta, vol] - if indextype == "HY": - vals.append(None) - r.append(vals) - return expiry, makedf(r, indextype, "SG") - - -def parse_gs_block(fh, indextype): - #skip header - while True: - line = next(fh) - if line.strip().startswith("Stk"): - break - - r = [] - for line in fh: - line = line.rstrip() - if line == "": - continue - if line.startswith("Expiry") or line.startswith("Assumes"): - break - vals = line.split() - if indextype == 'HY': - vals.pop(2) - vals.pop(9) - else: - vals.pop(1) - vals.pop(8) - strike = vals.pop(0) - if indextype == "HY": - vals.pop(0) # pop the spread - pay, pay_delta = vals[:2] - pay_bid, pay_offer = pay.split("/") - rec_bid, rec_offer = vals[2].split("/") - vol = vals[3] - tail = vals[6] - vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol] - if indextype == "HY": - vals.append(None) - vals.append(tail) - r.append(vals) - return makedf(r, indextype, "GS"), line - -def parse_citi_block(fh, indextype): - next(fh) #skip header - r = [] - for line in fh: - line = line.rstrip() - if line == "": - break - if indextype == "HY": - strike, payers, receivers, vol, price_vol = line.split("|") - else: - strike, payers, receivers, vol = line.split("|") - strike = strike.strip() - pay_bid, pay_offer = payers.split("/") - pay_bid = pay_bid.strip() - pay_offer = pay_offer.strip() - pay_offer, pay_delta = pay_offer.split() - rec_bid, rec_offer = receivers.split("/") - rec_bid = rec_bid.strip() - rec_offer = rec_offer.strip() - rec_offer, rec_delta = rec_offer.split() - vol = vol.strip() - vol = vol.split()[0] - if indextype == "HY": - price_vol = price_vol.strip() - r.append([strike, rec_bid, rec_offer, rec_delta, - pay_bid, pay_offer, pay_delta, vol, price_vol]) - else: - r.append([strike, rec_bid, rec_offer, rec_delta, - pay_bid, pay_offer, pay_delta, vol]) - return makedf(r, indextype, "CITI") - -def parse_ms(fh, indextype, *args): - option_stack = {} - for line in fh: - line = line.rstrip() - if "EXPIRY" in line: - expiry = line.split(" ")[1] - expiry = pd.to_datetime(expiry, format="%d-%b-%Y") - block = parse_ms_block(fh, indextype) - if block is None or block.empty: - logging.warning("MS: block is empty for {expiry} expiry") - else: - option_stack[expiry] = block - return option_stack - - -def parse_nom(fh, indextype, *args): - option_stack = {} - - def aux(line, fh, indextype, option_stack): - expiry = line.split(" ")[0] - expiry = pd.to_datetime(expiry, format="%d-%b-%y") - next_line, df = parse_nomura_block(fh, indextype) - option_stack[expiry] = df - if next_line: - if "EXPIRY" in next_line: - aux(next_line, fh, indextype, option_stack) - else: - raise RuntimeError(f"Don't know what to do with {line}.") - for line in fh: - line = line.rstrip() - if "EXPIRY" in line: - aux(line, fh, indextype, option_stack) - return option_stack - - -def parse_sg(fh, indextype, expiration_dates): - option_stack = {} - for line in fh: - line = line.rstrip() - if line.startswith("Type"): - expiry, df = parse_sg_block(fh, indextype, expiration_dates) - option_stack[expiry] = df - return option_stack - - -def parse_gs(fh, indextype, series, quotedate, ref): - option_stack = {} - fwd_index = [] - d = {'quotedate': quotedate, 'index': indextype, - 'series': series, 'ref': ref} - pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") - - line = next(fh).strip() - while True: - if line.startswith("Expiry"): - m = pat.match(line) - if m: - expiry, fwdprice, fwdspread = m.groups() - expiry = pd.to_datetime(expiry, format='%d%b%y') - d.update({'fwdspread': fwdspread, 'fwdprice': fwdprice, - 'expiry': expiry}) - fwd_index.append(d.copy()) - option_stack[expiry], line = parse_gs_block(fh, indextype) - else: - logging.error("Can't parse expiry line:", line) - elif line.startswith("Assumes"): - break - else: - try: - line = next(fh).strip() - except StopIteration: - break - - fwd_index = pd.DataFrame.from_records(fwd_index, - index='quotedate') - fwd_index['quote_source'] = 'GS' - return option_stack, fwd_index - -def parse_citi(fh, indextype, series, quotedate): - option_stack = {} - fwd_index = [] - d = {'quotedate': quotedate, - 'index': indextype, - 'series': series} - pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") - for line in fh: - line = line.strip() - if line.startswith("Exp"): - m = pat.match(line) - if m: - expiry, ref = m.groups() - expiry = pd.to_datetime(expiry, format='%d-%b-%y') - d.update({'ref': ref, - 'expiry': expiry}) - fwd_index.append(d.copy()) - option_stack[expiry] = parse_citi_block(fh, indextype) - else: - logging.error("Cant't parse expiry line:", line) - fwd_index = pd.DataFrame.from_records(fwd_index, - index='quotedate') - fwd_index['quote_source'] = 'CITI' - return option_stack, fwd_index - -subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") -subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)") -subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") -subject_gs = re.compile(r"(?:FW: |Fwd: )?GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") -subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") -subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y") - -def parse_email(email, date_received): - with open(email.path, "rt") as fh: - subject = fh.readline().lstrip() - - for source in ['BAML', 'GS', 'MS', 'NOM', 'SG', 'CITI']: - m = globals()[f'subject_{source.lower()}'].match(subject) - if m: - if source in ['BAML', 'CITI']: - indextype, series = m.groups() - else: - indextype, series, ref = m.groups() - ref = float(ref) - series = int(series) - cur_pos = fh.tell() - try: - quotedate = parse_quotedate(fh, date_received) - except RuntimeError: - logging.warning("couldn't find received date in message: " - f"{email.name}, using {date_received}") - quotedate = pd.Timestamp(date_received).tz_localize("America/New_York") - fh.seek(cur_pos) - - expiration_dates = list_imm_dates(quotedate) - parse_fun = globals()[f'parse_{source.lower()}'] - if source in ['BAML', 'CITI']: - return (quotedate, indextype, series), \ - parse_fun(fh, indextype, series, quotedate) - elif source == "GS": - return (quotedate, indextype, series), \ - parse_fun(fh, indextype, series, quotedate, ref) - else: - option_stack = parse_fun(fh, indextype, expiration_dates) - fwd_index = pd.DataFrame({'quotedate': quotedate, - 'ref': ref, - 'index': indextype, - 'series': series, - 'expiry': list(option_stack.keys()), - 'quote_source': source}) - fwd_index.set_index('quotedate', inplace=True) - return (quotedate, indextype, series), (option_stack, fwd_index) - else: - raise RuntimeError(f"can't parse subject line: {subject} for email {email.name}") - -def write_todb(swaption_stack, index_data, conn): - def gen_sql_str(query, table_name, columns): - return query.format(sql.Identifier(table_name), - sql.SQL(", ").join(sql.Identifier(c) for c in columns), - sql.SQL(", ").join(sql.Placeholder() * len(columns))) - query = sql.SQL("INSERT INTO {}({}) VALUES({}) " - "ON CONFLICT DO NOTHING RETURNING ref_id") - sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns) - query = sql.SQL("INSERT INTO {}({}) VALUES({}) " - "ON CONFLICT DO NOTHING") - with conn.cursor() as c: - for t in index_data.itertuples(index=False): - c.execute(sql_str, t) - try: - ref_id, = next(c) - except StopIteration: - continue - else: - try: - df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),] - except KeyError as e: - logging.warning("missing key in swaption_stack: " - f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") - continue - except IndexingError: - breakpoint() - df['ref_id'] = ref_id - c.executemany(gen_sql_str(query, "swaption_quotes", df.columns), - df.itertuples(index=False)) - conn.commit() - - -def get_email_list(date): - """returns a list of email file names for a given date - - Parameters - ---------- - date : string - """ - with open(".pickle", "rb") as fh: - already_uploaded = pickle.load(fh) - df = pd.DataFrame.from_dict(already_uploaded, orient='index') - df.columns = ['quotedate'] - df = df.reset_index().set_index('quotedate') - return df.loc[date, 'index'].tolist() - - -def pickle_drop_date(date): - with open(".pickle", "rb") as fh: - already_uploaded = pickle.load(fh) - newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} - with open(".pickle", "wb") as fh: - pickle.dump(newdict, fh) - - -if __name__ == "__main__": - try: - save_emails() - except (errors.HttpError, FileNotFoundError) as e: - logging.error(e) - save_emails(update=False) - data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") - emails = [f for f in os.scandir(data_dir) if f.is_file()] - swaption_stack = {} - index_data = pd.DataFrame() - try: - with open(".pickle", "rb") as fh: - already_uploaded = pickle.load(fh) - except FileNotFoundError: - already_uploaded = {} - for f in emails: - date_composed, msg_id = f.name.split("_") - date_composed = datetime.datetime.strptime(date_composed, - "%Y-%m-%d %H-%M-%S") - if msg_id in already_uploaded: - continue - else: - try: - key, (option_stack, fwd_index) = parse_email(f, date_composed) - except RuntimeError as e: - logging.error(e) - else: - if key[0] is None or len(option_stack) == 0: - logging.error(f"Something wrong with email: {f.name}") - continue - swaption_stack[key] = pd.concat(option_stack, - names=['expiry', 'strike']) - index_data = index_data.append(fwd_index) - already_uploaded[msg_id] = key[0] - if index_data.empty: - sys.exit() - for col in ['fwdbpv', 'fwdprice', 'fwdspread', 'ref']: - if col in index_data: - index_data[col] = index_data[col].astype('float') - index_data['index'] = index_data['index'].astype('category') - - swaption_stack = pd.concat(swaption_stack, - names=['quotedate', 'index', 'series']) - swaption_stack = swaption_stack.reset_index() - swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', - 'expiry', 'strike']) - swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) - index_data = index_data.reset_index() - index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) - from db import serenitas_pool - conn = serenitas_pool.getconn() - write_todb(swaption_stack, index_data, conn) - serenitas_pool.putconn(conn) - - with open(".pickle", "wb") as fh: - pickle.dump(already_uploaded, fh) |
