diff options
Diffstat (limited to 'python/quote_parsing/parse_emails.py')
| -rw-r--r-- | python/quote_parsing/parse_emails.py | 505 |
1 files changed, 505 insertions, 0 deletions
diff --git a/python/quote_parsing/parse_emails.py b/python/quote_parsing/parse_emails.py new file mode 100644 index 00000000..2b1e2cc1 --- /dev/null +++ b/python/quote_parsing/parse_emails.py @@ -0,0 +1,505 @@ +import pandas as pd +import re +import psycopg2.sql as sql +from download_emails import save_emails, errors +import datetime +import pickle +import sys +from quantlib.time.imm import next_date +from quantlib.time.api import Date, pydate_from_qldate + + +def list_imm_dates(date): + d = Date.from_datetime(date) + r = [] + for i in range(10): + d = next_date(d, False) + r.append(pydate_from_qldate(d)) + return r + + +def makedf(r, indextype, quote_source): + if indextype == 'IG': + cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', + 'pay_offer', 'delta_pay', 'vol'] + else: + cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', + 'pay_offer', 'delta_pay', 'vol', 'price_vol'] + if quote_source == "BAML": + cols.append('gamma') + if quote_source == "GS": + cols.append("tail") + df = pd.DataFrame.from_records(r, columns=cols) + for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']: + if col in df: + try: + df[col] = df[col].str.rstrip("%").astype('float') / 100 + except ValueError: #typo in one email + df[col] = (df[col].str.rstrip("%"). + str.replace("n", "").astype("float") / 100) + if quote_source == "GS": + for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: + df[col] = df[col].str.strip('-') + df['delta_pay'] *= -1 + for k in df: + if df.dtypes[k] == 'object': + df[k] = df[k].str.replace(",", "") + try: + df[k] = pd.to_numeric(df[k]) + except ValueError as e: + logger.info(e) + logger.error("couldn't convert column") + df[k] = pd.to_numeric(df[k].str.replace("n", "")) + breakpoint() + df.set_index('strike', inplace=True) + return df + + +def parse_quotedate(fh, date_received): + for line in fh: + line = line.rstrip() + if "At:" in line or "Sent:" in line: + for p in ["%m/%d/%y %H:%M:%S", "%b %d %Y %H:%M:%S", "%m/%d %H:%M:%S", + "%B %d, %Y %I:%M %p"]: + try: + quotedate = pd.to_datetime(line, format=p, exact=False) + except ValueError: + continue + else: + if quotedate.year == 1900: # p='%m/%d %H:%M:%S' + quotedate = quotedate.replace(year=date_received.year) + quotedate = quotedate.tz_localize("America/New_York") + break + else: + raise RuntimeError("can't parse date from {line}") + return quotedate + else: + raise RuntimeError("no date received in the email") + + +def parse_refline(line): + regex = r"Ref:(?P<ref>\S+)\s+(?:Fwd Px:(?P<fwdprice>\S+)\s+)?" \ + r"Fwd(?: Spd)?:(?P<fwdspread>\S+)\s+Fwd Bpv:(?P<fwdbpv>\S+)" \ + r"\s+Expiry:(?P<expiry>\S+)" + m = re.match(regex, line) + try: + d = m.groupdict() + d['expiry'] = pd.to_datetime(d['expiry'], format='%d-%b-%y') + except AttributeError: + raise RuntimeError(f"can't parse refline {line}") + return d + + +def parse_baml(fh, indextype, series, quotedate, *args): + option_stack = {} + fwd_index = [] + line = "" + while True: + if line == "": + try: + line = next(fh) + except StopIteration: + break + if line.startswith("Ref"): + d = parse_refline(line) + d.update({'quotedate': quotedate, 'index': indextype, 'series': series}) + df, line = parse_baml_block(fh, indextype) + option_stack[d['expiry']] = df + fwd_index.append(d) + else: + line = "" + if option_stack: + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'BAML' + return option_stack, fwd_index + else: + raise RuntimeError("empty email: " + fh.name) + + +def parse_baml_block(fh, indextype): + next(fh) # skip header + r = [] + line = "" + for line in fh: + line = line.strip() + if line.startswith("Ref") or line == "": + break + line = re.sub("[/|]", " ", line) + vals = re.sub(" +", " ", line).rstrip().split(" ") + if len(vals) < 3: # something went wrong + line = "" + break + r.append(vals) + return makedf(r, indextype, "BAML"), line + + +def parse_ms_block(fh, indextype): + line = next(fh) # skip header + if line.strip() == "": # empty block + return None + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + strike, payer, receiver, vol = line.split("|") + strike = strike.strip() + if indextype == "HY": + strike = strike.split()[0] + try: + pay_bid, pay_offer, pay_delta = payer.strip().split() + rec_bid, rec_offer, rec_delta = receiver.strip().split() + except ValueError: + try: + pay_mid, pay_delta = payer.strip().split() + rec_mid, rec_delta = receiver.strip().split() + pay_bid, pay_offer = pay_mid, pay_mid + rec_bid, rec_offer = rec_mid, rec_mid + except ValueError: + raise RuntimeError("Couldn't parse line: {line}") + + vals = [strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta] + vol = vol.strip() + if indextype == "HY": + try: + price_vol, vol = vol.replace("[", "").replace("]", "").split() + except ValueError: + price_vol, vol, vol_change, be = (vol.replace("[", ""). + replace("]", "").split()) + vals += [vol, price_vol] + else: + if " " in vol: + vol, vol_change, be = vol.split() + vals += [vol] + r.append(vals) + return makedf(r, indextype, "MS") + + +def parse_nomura_block(fh, indextype): + next(fh) # skip header + r = [] + for line in fh: + line = line.rstrip() + if "EXPIRY" in line or line == "": + break + strike, receiver, payer, vol, _ = line.split("|", 4) + strike = strike.strip() + pay, pay_delta = payer.strip().split() + rec, rec_delta = receiver.strip().split() + pay_bid, pay_offer = pay.split("/") + rec_bid, rec_offer = rec.split("/") + vol = vol.strip() + vals = [strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol] + if indextype == "HY": # we don't have price vol + vals.append(None) + r.append(vals) + else: + return None, makedf(r, indextype, "NOM") + return line, makedf(r, indextype, "NOM") + + +def parse_sg_block(fh, indextype, expiration_dates): + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + if indextype == "IG": + option_type, strike, price, delta, vol, expiry = line.split() + else: + option_type, strike, strike_spread, price, delta, vol, expiry = line.split() + + expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month + expiry = next(pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month) + if option_type == "Rec": + rec_bid, rec_offer = price.split("/") + pay_bid, pay_offer = None, None + rec_delta, pay_delta = delta, None + else: + pay_bid, pay_offer = price.split("/") + rec_bid, rec_offer = None, None + rec_delta, pay_delta = None, delta + vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, + pay_offer, pay_delta, vol] + if indextype == "HY": + vals.append(None) + r.append(vals) + return expiry, makedf(r, indextype, "SG") + + +def parse_gs_block(fh, indextype): + #skip header + while True: + line = next(fh) + if line.strip().startswith("Stk"): + break + + r = [] + for line in fh: + line = line.rstrip() + if line == "": + continue + if line.startswith("Expiry") or line.startswith("Assumes"): + break + vals = line.split() + if indextype == 'HY': + vals.pop(2) + vals.pop(9) + else: + vals.pop(1) + vals.pop(8) + strike = vals.pop(0) + if indextype == "HY": + vals.pop(0) # pop the spread + pay, pay_delta = vals[:2] + pay_bid, pay_offer = pay.split("/") + rec_bid, rec_offer = vals[2].split("/") + vol = vals[3] + tail = vals[6] + vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol] + if indextype == "HY": + vals.append(None) + vals.append(tail) + r.append(vals) + return makedf(r, indextype, "GS"), line + +def parse_citi_block(fh, indextype): + next(fh) #skip header + r = [] + for line in fh: + line = line.rstrip() + if line == "": + break + if indextype == "HY": + strike, payers, receivers, vol, price_vol = line.split("|") + else: + strike, payers, receivers, vol = line.split("|") + strike = strike.strip() + pay_bid, pay_offer = payers.split("/") + pay_bid = pay_bid.strip() + pay_offer = pay_offer.strip() + pay_offer, pay_delta = pay_offer.split() + rec_bid, rec_offer = receivers.split("/") + rec_bid = rec_bid.strip() + rec_offer = rec_offer.strip() + rec_offer, rec_delta = rec_offer.split() + vol = vol.strip() + vol = vol.split()[0] + if indextype == "HY": + price_vol = price_vol.strip() + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol, price_vol]) + else: + r.append([strike, rec_bid, rec_offer, rec_delta, + pay_bid, pay_offer, pay_delta, vol]) + return makedf(r, indextype, "CITI") + +def parse_ms(fh, indextype, *args): + option_stack = {} + for line in fh: + line = line.rstrip() + if "EXPIRY" in line: + expiry = line.split(" ")[1] + expiry = pd.to_datetime(expiry, format="%d-%b-%Y") + block = parse_ms_block(fh, indextype) + if block is None or block.empty: + logger.warning("MS: block is empty for {expiry} expiry") + else: + option_stack[expiry] = block + return option_stack + + +def parse_nom(fh, indextype, *args): + option_stack = {} + + def aux(line, fh, indextype, option_stack): + expiry = line.split(" ")[0] + expiry = pd.to_datetime(expiry, format="%d-%b-%y") + next_line, df = parse_nomura_block(fh, indextype) + option_stack[expiry] = df + if next_line: + if "EXPIRY" in next_line: + aux(next_line, fh, indextype, option_stack) + else: + raise RuntimeError(f"Don't know what to do with {line}.") + for line in fh: + line = line.rstrip() + if "EXPIRY" in line: + aux(line, fh, indextype, option_stack) + return option_stack + + +def parse_sg(fh, indextype, expiration_dates): + option_stack = {} + for line in fh: + line = line.rstrip() + if line.startswith("Type"): + expiry, df = parse_sg_block(fh, indextype, expiration_dates) + option_stack[expiry] = df + return option_stack + + +def parse_gs(fh, indextype, series, quotedate, ref): + option_stack = {} + fwd_index = [] + d = {'quotedate': quotedate, 'index': indextype, + 'series': series, 'ref': ref} + pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") + + line = next(fh).strip() + while True: + if line.startswith("Expiry"): + m = pat.match(line) + if m: + expiry, fwdprice, fwdspread = m.groups() + expiry = pd.to_datetime(expiry, format='%d%b%y') + d.update({'fwdspread': fwdspread, 'fwdprice': fwdprice, + 'expiry': expiry}) + fwd_index.append(d.copy()) + option_stack[expiry], line = parse_gs_block(fh, indextype) + else: + logger.error("Can't parse expiry line:", line) + elif line.startswith("Assumes"): + break + else: + try: + line = next(fh).strip() + except StopIteration: + break + + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'GS' + return option_stack, fwd_index + +def parse_citi(fh, indextype, series, quotedate): + option_stack = {} + fwd_index = [] + d = {'quotedate': quotedate, + 'index': indextype, + 'series': series} + pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") + for line in fh: + line = line.strip() + if line.startswith("Exp"): + m = pat.match(line) + if m: + expiry, ref = m.groups() + expiry = pd.to_datetime(expiry, format='%d-%b-%y') + d.update({'ref': ref, + 'expiry': expiry}) + fwd_index.append(d.copy()) + option_stack[expiry] = parse_citi_block(fh, indextype) + else: + logger.error("Cant't parse expiry line:", line) + fwd_index = pd.DataFrame.from_records(fwd_index, + index='quotedate') + fwd_index['quote_source'] = 'CITI' + return option_stack, fwd_index + +subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") +subject_ms = re.compile(r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)") +subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") +subject_gs = re.compile(r"(?:FW: |Fwd: )?GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") +subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") +subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y") + +def parse_email(email, date_received): + with open(email.path, "rt") as fh: + subject = fh.readline().lstrip() + + for source in ['BAML', 'GS', 'MS', 'NOM', 'SG', 'CITI']: + m = globals()[f'subject_{source.lower()}'].match(subject) + if m: + if source in ['BAML', 'CITI']: + indextype, series = m.groups() + else: + indextype, series, ref = m.groups() + ref = float(ref) + series = int(series) + cur_pos = fh.tell() + try: + quotedate = parse_quotedate(fh, date_received) + except RuntimeError: + logger.warning("couldn't find received date in message: " + f"{email.name}, using {date_received}") + quotedate = pd.Timestamp(date_received).tz_localize("America/New_York") + fh.seek(cur_pos) + + expiration_dates = list_imm_dates(quotedate) + parse_fun = globals()[f'parse_{source.lower()}'] + if source in ['BAML', 'CITI']: + return (quotedate, indextype, series), \ + parse_fun(fh, indextype, series, quotedate) + elif source == "GS": + return (quotedate, indextype, series), \ + parse_fun(fh, indextype, series, quotedate, ref) + else: + option_stack = parse_fun(fh, indextype, expiration_dates) + fwd_index = pd.DataFrame({'quotedate': quotedate, + 'ref': ref, + 'index': indextype, + 'series': series, + 'expiry': list(option_stack.keys()), + 'quote_source': source}) + fwd_index.set_index('quotedate', inplace=True) + return (quotedate, indextype, series), (option_stack, fwd_index) + else: + raise RuntimeError(f"can't parse subject line: {subject} for email {email.name}") + +def write_todb(swaption_stack, index_data, conn): + def gen_sql_str(query, table_name, columns): + return query.format(sql.Identifier(table_name), + sql.SQL(", ").join(sql.Identifier(c) for c in columns), + sql.SQL(", ").join(sql.Placeholder() * len(columns))) + query = sql.SQL("INSERT INTO {}({}) VALUES({}) " + "ON CONFLICT DO NOTHING RETURNING ref_id") + sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns) + query = sql.SQL("INSERT INTO {}({}) VALUES({}) " + "ON CONFLICT DO NOTHING") + with conn.cursor() as c: + for t in index_data.itertuples(index=False): + c.execute(sql_str, t) + try: + ref_id, = next(c) + except StopIteration: + continue + else: + try: + df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),] + except KeyError as e: + logger.warning("missing key in swaption_stack: " + f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}") + continue + except IndexingError: + breakpoint() + df['ref_id'] = ref_id + c.executemany(gen_sql_str(query, "swaption_quotes", df.columns), + df.itertuples(index=False)) + conn.commit() + + +def get_email_list(date): + """returns a list of email file names for a given date + + Parameters + ---------- + date : string + """ + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) + df = pd.DataFrame.from_dict(already_uploaded, orient='index') + df.columns = ['quotedate'] + df = df.reset_index().set_index('quotedate') + return df.loc[date, 'index'].tolist() + + +def pickle_drop_date(date): + with open(".pickle", "rb") as fh: + already_uploaded = pickle.load(fh) + newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} + with open(".pickle", "wb") as fh: + pickle.dump(newdict, fh) |
