import pandas as pd import re import psycopg2.sql as sql import datetime import pickle from . import logger from functools import partial, lru_cache from quantlib.time.imm import next_date from quantlib.time.api import Date, pydate_from_qldate from unicodedata import normalize def list_imm_dates(date): d = Date.from_datetime(date) r = [] for i in range(10): d = next_date(d, False) r.append(pydate_from_qldate(d)) return r def makedf(r, indextype, quote_source): if indextype == "IG": cols = [ "strike", "rec_bid", "rec_offer", "delta_rec", "pay_bid", "pay_offer", "delta_pay", "vol", ] else: cols = [ "strike", "rec_bid", "rec_offer", "delta_rec", "pay_bid", "pay_offer", "delta_pay", "vol", "price_vol", ] if quote_source == "BAML": cols.append("gamma") if quote_source == "GS": cols.append("tail") df = pd.DataFrame.from_records(r, columns=cols) for col in ["delta_rec", "delta_pay", "vol", "price_vol", "gamma", "tail"]: if col in df: try: df[col] = df[col].str.rstrip("%").astype("float") / 100 except ValueError: # typo in one email df[col] = ( pd.to_numeric(df[col].str.rstrip("%").str.replace("n", "")) / 100 ) if quote_source == "GS": for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: df[col] = df[col].str.strip("-") df["delta_pay"] *= -1 for k in df: if df.dtypes[k] == "object": df[k] = df[k].str.replace(",", "") try: df[k] = pd.to_numeric(df[k]) except ValueError as e: logger.info(e) logger.error("couldn't convert column") df[k] = pd.to_numeric( df[k] .str.replace("n", "") .str.replace("\\", "") .str.replace("M", "") ) df.set_index("strike", inplace=True) return df def parse_quotedate(fh, date_received): for line in fh: line = line.rstrip() if "At:" in line or "Sent:" in line: for p in [ "%m/%d/%y %H:%M:%S", "%b %d %Y %H:%M:%S", "%m/%d %H:%M:%S", "%B %d, %Y %I:%M %p", ]: try: quotedate = pd.to_datetime(line, format=p, exact=False) except ValueError: continue else: if quotedate.year == 1900: # p='%m/%d %H:%M:%S' quotedate = quotedate.replace(year=date_received.year) quotedate = quotedate.tz_localize("America/New_York") break else: raise RuntimeError("can't parse date from {line}") return quotedate else: raise RuntimeError("no date received in the email") def parse_refline(line): regex = ( r"Ref:(?P\S+)\s+(?:Fwd Px:(?P\S+)\s+)?" r"Fwd(?: Spd)?:(?P\S+)\s+Fwd Bpv:(?P\S+)" r"\s+Expiry:(?P\S+)" ) m = re.match(regex, line) try: d = m.groupdict() d["expiry"] = pd.to_datetime(d["expiry"], format="%d-%b-%y") except AttributeError: raise RuntimeError(f"can't parse refline {line}") return d def parse_baml(fh, index_desc, *args): option_stack = {} fwd_index = [] line = "" while True: if line == "": try: line = next(fh) except StopIteration: break if line.startswith("Ref"): d = parse_refline(line) d.update(index_desc) df, line = parse_baml_block(fh, index_desc["index"]) option_stack[d["expiry"]] = df fwd_index.append(d) else: line = "" return option_stack, fwd_index def parse_baml_block(fh, indextype): next(fh) # skip header r = [] line = "" for line in fh: line = line.strip() if line.startswith("Ref") or line == "": break line = re.sub("[/|]", " ", line) vals = re.sub(" +", " ", line).rstrip().split(" ") if len(vals) < 3: # something went wrong line = "" break r.append(vals) return makedf(r, indextype, "BAML"), line def parse_bnp_block(fh, indextype, skip_header=True): if skip_header: next(fh) # skip header r = [] for line in fh: line = line.strip() if "\xa0" in line: line = normalize("NFKD", line) if line.startswith("Ref") or line == "": break line = re.sub("[/|]", " ", line) vals = re.sub(" +", " ", line).rstrip().split(" ") if indextype == "HY": vals += [""] if len(vals) < 3: # something went wrong line = "" break r.append(vals) return makedf(r, indextype, "BNP") def parse_cs_block(fh, indextype): next(fh) # skip header r = [] for line in fh: line = line.strip() if line.startswith("Ref") or line == "": break line = re.sub("[/|]", " ", line) vals = re.sub(" +", " ", line).rstrip().split(" ") if len(vals) == 1: logger.info("spurious line", line) continue strike, *rest = vals # CS quotes payer first, so we need to move things around a bit if indextype == "IG": vals = (strike, *rest[3:6], *rest[:3], rest[6]) elif indextype == "HY": vals = (strike, *rest[3:6], *rest[:3], *rest[6:8]) r.append(vals) return makedf(r, indextype, "CS") def parse_ms_block(fh, indextype): line = next(fh) # skip header if line.strip() == "": # empty block return None r = [] for line in fh: line = line.rstrip() if line == "": break strike, payer, receiver, vol = line.split("|") strike = strike.strip() if indextype == "HY": strike = strike.split()[0] try: pay_bid, pay_offer, pay_delta = payer.strip().split() rec_bid, rec_offer, rec_delta = receiver.strip().split() except ValueError: try: pay_mid, pay_delta = payer.strip().split() rec_mid, rec_delta = receiver.strip().split() pay_bid, pay_offer = pay_mid, pay_mid rec_bid, rec_offer = rec_mid, rec_mid except ValueError: raise RuntimeError("Couldn't parse line: {line}") vals = [strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta] vol = vol.strip() if indextype == "HY": try: price_vol, vol = vol.replace("[", "").replace("]", "").split() except ValueError: price_vol, vol, vol_change, be = ( vol.replace("[", "").replace("]", "").split() ) vals += [vol, price_vol] else: if " " in vol: vol, vol_change, be = vol.split() vals += [vol] r.append(vals) return makedf(r, indextype, "MS") def parse_nomura_block(fh, indextype): next(fh) # skip header r = [] for line in fh: line = line.rstrip() if "EXPIRY" in line or line == "": break strike, receiver, payer, vol, _ = line.split("|", 4) strike = strike.strip() pay, pay_delta = payer.strip().split() rec, rec_delta = receiver.strip().split() pay_bid, pay_offer = pay.split("/") rec_bid, rec_offer = rec.split("/") vol = vol.strip() vals = [ strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol, ] if indextype == "HY": # we don't have price vol vals.append(None) r.append(vals) else: return None, makedf(r, indextype, "NOM") return line, makedf(r, indextype, "NOM") def parse_sg_block(fh, indextype, expiration_dates): r = [] for line in fh: line = line.rstrip() if line == "": break if indextype == "IG": option_type, strike, price, delta, vol, expiry = line.split() else: option_type, strike, strike_spread, price, delta, vol, expiry = line.split() expiry_month = datetime.datetime.strptime(expiry, "%b-%y").month expiry = next( pd.Timestamp(d) for d in expiration_dates if d.month == expiry_month ) if option_type == "Rec": rec_bid, rec_offer = price.split("/") pay_bid, pay_offer = None, None rec_delta, pay_delta = delta, None else: pay_bid, pay_offer = price.split("/") rec_bid, rec_offer = None, None rec_delta, pay_delta = None, delta vals = [ strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol, ] if indextype == "HY": vals.append(None) r.append(vals) return expiry, makedf(r, indextype, "SG") def parse_gs_block(fh, indextype): # skip header while True: line = next(fh) if line.strip().startswith("Stk"): break r = [] for line in fh: line = line.rstrip() if line == "": continue if line.startswith("Expiry") or line.startswith("Assumes"): break vals = line.split() if indextype == "HY": vals.pop(2) vals.pop(9) else: vals.pop(1) vals.pop(8) strike = vals.pop(0) if indextype == "HY": vals.pop(0) # pop the spread pay, pay_delta = vals[:2] pay_bid, pay_offer = pay.split("/") rec_bid, rec_offer = vals[2].split("/") vol = vals[3] tail = vals[6] vals = [strike, rec_bid, rec_offer, None, pay_bid, pay_offer, pay_delta, vol] if indextype == "HY": vals.append(None) vals.append(tail) r.append(vals) return makedf(r, indextype, "GS"), line def parse_citi_block(fh, indextype): next(fh) # skip header r = [] for line in fh: line = line.rstrip() if line == "": break if indextype == "HY": strike, payers, receivers, vol, price_vol = line.split("|") else: strike, payers, receivers, vol = line.split("|") strike = strike.strip() pay_bid, pay_offer = payers.split("/") pay_bid = pay_bid.strip() pay_offer = pay_offer.strip() pay_offer, pay_delta = pay_offer.split() rec_bid, rec_offer = receivers.split("/") rec_bid = rec_bid.strip() rec_offer = rec_offer.strip() rec_offer, rec_delta = rec_offer.split() vol = vol.strip() vol = vol.split()[0] if indextype == "HY": price_vol = price_vol.strip() r.append( [ strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol, price_vol, ] ) else: r.append( [ strike, rec_bid, rec_offer, rec_delta, pay_bid, pay_offer, pay_delta, vol, ] ) return makedf(r, indextype, "CITI") def parse_ms(fh, index_desc, *args): option_stack = {} fwd_index = [] for line in fh: line = line.rstrip() if "EXPIRY" in line: expiry = line.split(" ")[1] expiry = pd.to_datetime(expiry, format="%d-%b-%Y") block = parse_ms_block(fh, index_desc["index"]) fwd_index.append({"expiry": expiry, **index_desc}) if block is None or block.empty: logger.warning("MS: block is empty for {expiry} expiry") else: option_stack[expiry] = block return option_stack, fwd_index def parse_nom(fh, index_desc, *args): option_stack = {} fwd_index = [] def aux(line, fh, index_desc, option_stack, fwd_index): expiry = line.split(" ")[0] expiry = pd.to_datetime(expiry, format="%d-%b-%y") next_line, df = parse_nomura_block(fh, index_desc["index"]) option_stack[expiry] = df fwd_index.append({"expiry": expiry, **index_desc}) if next_line: if "EXPIRY" in next_line: aux(next_line, fh, index_desc, option_stack, fwd_index) else: raise RuntimeError(f"Don't know what to do with {line}.") for line in fh: line = line.rstrip() if "EXPIRY" in line: aux(line, fh, index_desc, option_stack, fwd_index) return option_stack, fwd_index def parse_sg(fh, index_desc): option_stack = {} fwd_index = [] expiration_dates = index_desc.pop("expiration_dates") for line in fh: line = line.rstrip() if line.startswith("Type"): expiry, df = parse_sg_block(fh, index_desc["index"], expiration_dates) option_stack[expiry] = df fwd_index.append({"expiry": expiry, **index_desc}) return option_stack, fwd_index def parse_gs(fh, index_desc): option_stack = {} fwd_index = [] pat = re.compile(r"Expiry (\d{2}\w{3}\d{2}) \((?:([\S]+) )?([\S]+)\)") line = next(fh).strip() while True: if line.startswith("Expiry"): m = pat.match(line) if m: expiry, fwdprice, fwdspread = m.groups() expiry = pd.to_datetime(expiry, format="%d%b%y") fwd_index.append( { **index_desc, **{ "fwdspread": fwdspread, "fwdprice": fwdprice, "expiry": expiry, }, } ) option_stack[expiry], line = parse_gs_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line) elif line.startswith("Assumes"): break else: try: line = next(fh).strip() except StopIteration: break return option_stack, fwd_index def parse_citi(fh, index_desc): option_stack = {} fwd_index = [] pat = re.compile(r"Exp: (\d{2}-\w{3}-\d{2})[^R]*Ref:[^\d]*([\d.]+)") for line in fh: line = line.strip() if line.startswith("Exp"): m = pat.match(line) if m: expiry, ref = m.groups() expiry = pd.to_datetime(expiry, format="%d-%b-%y") fwd_index.append({"ref": ref, "expiry": expiry, **index_desc}) option_stack[expiry] = parse_citi_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line) return option_stack, fwd_index def parse_cs(fh, index_desc): option_stack = {} fwd_index = [] regex = { "HY": r"Ref:\s*(?P[\d.]+)\s*Fwd: (?P[\d.]+)\s*Expiry: (?P\d{2}-\w{3}-\d{2})", "IG": r"Ref:\s*(?P[\d.]+)\s*Fwd: (?P[\d.]+)\s*Expiry: (?P\d{2}-\w{3}-\d{2})\s*Fwd dv01:\s*(?P[\d.]*).*", } pat = re.compile(regex[index_desc["index"]]) for line in fh: line = line.strip() if line.startswith("Ref"): m = pat.match(line) if m: d = m.groupdict() d["expiry"] = pd.to_datetime(d["expiry"], format="%d-%b-%y") fwd_index.append({**index_desc, **d}) option_stack[d["expiry"]] = parse_cs_block(fh, index_desc["index"]) else: logger.error("Can't parse expiry line:", line, "filename:", fh.name) return option_stack, fwd_index def parse_bnp(fh, index_desc): option_stack = {} fwd_index = [] regex = r"Ref\s+(?P[\d.]+)\s+-\s+(?P\w{3}\d{2})\s+-\s+Fwd\s+(?P[\d.]+)" expiration_dates = index_desc.pop("expiration_dates") pat = re.compile(regex) for line in fh: line = line.strip() if line.startswith("Ref"): c = line.find("Strike") if c != -1: line = line[:c].rstrip() m = pat.match(line) if m: d = m.groupdict() if index_desc["index"] == "HY": d["fwdprice"] = d.pop("fwdspread") expiry_month = datetime.datetime.strptime(d["expiry"], "%b%y").month d["expiry"] = next( d for d in expiration_dates if d.month == expiry_month ) fwd_index.append({**index_desc, **d}) option_stack[d["expiry"]] = parse_bnp_block( fh, index_desc["index"], c == -1 ) else: logger.error(f"Can't parse expiry line: {line} for filename: {fh.name}") return option_stack, fwd_index subject_baml = re.compile(r"(?:Fwd:){0,2}(?:BAML )?(\D{2})(\d{1,2})\s") subject_ms = re.compile( r"[^$]*\$\$ MS CDX OPTIONS: (IG|HY)(\d{2})[^-]*- REF[^\d]*([\d.]+)" ) subject_nom = re.compile(r"(?:Fwd:)?CDX (IG|HY)(\d{2}).*- REF:[^\d]*([\d.]+)") subject_gs = re.compile(r"(?:FW: |Fwd: )?GS (IG|HY)(\d{2}) 5y.*- Ref [^\d]*([\d.]+)") subject_sg = re.compile(r"SG OPTIONS - CDX (IG|HY) S(\d{2}).* REF[^\d]*([\d.]+)") subject_citi = re.compile(r"(?:Fwd:)?Citi Options: (IG|HY)(\d{2}) 5Y") subject_cs = re.compile( r"CS CDX (?PIG|HY)(?P\d{2})_?v?(?P\d)? Options -\s+(?:\d{2}/\d{2}/\d{2}\s+)?Ref = (?P[\d.]+)[^\d]*" ) subject_bnp = re.compile(r"CDX OPTIONS RUN: (IG|HY)(\d{2}).*") def get_current_version(index, series, d, conn): with conn.cursor() as c: c.execute( "select max(version) FROM index_version " "WHERE index=%s and series=%s and %s <= lastdate", (index.upper(), series, d), ) version, = c.fetchone() return version def parse_email(email, date_received, conn): get_version = lru_cache()(partial(get_current_version, conn=conn)) with email.open("rt") as fh: subject = fh.readline().lstrip() for source in ["BAML", "GS", "MS", "NOM", "SG", "CITI", "CS", "BNP"]: m = globals()[f"subject_{source.lower()}"].match(subject) if m: version = None if source in ["BAML", "CITI", "BNP"]: indextype, series = m.groups() elif source == "CS": d = m.groupdict() version = d.get("version") indextype = d["index"] series = d["series"] ref = float(d["ref"]) else: indextype, series, ref = m.groups() ref = float(ref) series = int(series) cur_pos = fh.tell() try: quotedate = parse_quotedate(fh, date_received) except RuntimeError: logger.warning( "couldn't find received date in message: " f"{email.name}, using {date_received}" ) quotedate = pd.Timestamp(date_received).tz_localize( "America/New_York" ) fh.seek(cur_pos) if version is None: version = get_version(indextype, series, quotedate) parse_fun = globals()[f"parse_{source.lower()}"] key = (quotedate, indextype, series, source) index_desc = { "quotedate": quotedate, "index": indextype, "series": series, "version": version, } if source in ["GS", "MS", "NOM", "SG"]: index_desc["ref"] = ref if source in ["BNP", "SG"]: index_desc["expiration_dates"] = list_imm_dates(quotedate) option_stack, fwd_index = parse_fun(fh, index_desc) if fwd_index: fwd_index = pd.DataFrame.from_records(fwd_index, index="quotedate") fwd_index["quote_source"] = source else: raise RuntimeError("empty email " + fh.name) return (key, (option_stack, fwd_index)) else: raise RuntimeError( f"can't parse subject line: {subject} for email {email.name}" ) def write_todb(swaption_stack, index_data, conn): def gen_sql_str(query, table_name, columns): return query.format( sql.Identifier(table_name), sql.SQL(", ").join(sql.Identifier(c) for c in columns), sql.SQL(", ").join(sql.Placeholder() * len(columns)), ) query = sql.SQL( "INSERT INTO {}({}) VALUES({}) " "ON CONFLICT DO NOTHING RETURNING ref_id" ) sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns) query = sql.SQL("INSERT INTO {}({}) VALUES({}) " "ON CONFLICT DO NOTHING") with conn.cursor() as c: for t in index_data.itertuples(index=False): c.execute(sql_str, t) try: ref_id, = next(c) except StopIteration: continue else: try: df = swaption_stack.loc[ (t.quotedate, t.index, t.series, t.expiry, t.quote_source), ] except KeyError as e: logger.warning( "missing key in swaption_stack: " f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}, {t.quote_source}" ) continue except IndexError: breakpoint() df["ref_id"] = ref_id c.executemany( gen_sql_str(query, "swaption_quotes", df.columns), df.itertuples(index=False), ) conn.commit() def get_email_list(date): """returns a list of email file names for a given date Parameters ---------- date : string """ with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) df = pd.DataFrame.from_dict(already_uploaded, orient="index") df.columns = ["quotedate"] df = df.reset_index().set_index("quotedate") return df.loc[date, "index"].tolist() def pickle_drop_date(date): with open(".pickle", "rb") as fh: already_uploaded = pickle.load(fh) newdict = {k: v for k, v in already_uploaded.items() if v.date() != date} with open(".pickle", "wb") as fh: pickle.dump(newdict, fh)