diff options
Diffstat (limited to 'python/quote_parsing')
| -rw-r--r-- | python/quote_parsing/__main__.py | 22 | ||||
| -rw-r--r-- | python/quote_parsing/parse_emails.py | 37 |
2 files changed, 37 insertions, 22 deletions
diff --git a/python/quote_parsing/__main__.py b/python/quote_parsing/__main__.py index a0e35510..3fd50f4e 100644 --- a/python/quote_parsing/__main__.py +++ b/python/quote_parsing/__main__.py @@ -42,13 +42,16 @@ except FileNotFoundError: conn = serenitas_pool.getconn() for f in emails: + print(f) date_composed, msg_id = f.name.split("_") date_composed = datetime.datetime.strptime(date_composed, "%Y-%m-%d %H-%M-%S") + if date_composed.date() < datetime.date.fromisoformat("2021-01-18"): + continue if msg_id == "16e4b563f6cff219": # GS message has IG quotes with a HY header continue - if msg_id in already_uploaded: - continue + # if msg_id in already_uploaded: + # continue else: try: key, (option_stack, fwd_index) = parse_email(f, date_composed, conn) @@ -58,12 +61,12 @@ for f in emails: if key[0] is None or len(option_stack) == 0: logger.error(f"Something wrong with email: {f.name}") continue - swaption_stack[key] = pd.concat(option_stack, names=["expiry", "strike"]) + swaption_stack[key] = pd.concat(option_stack, names=["expiry", "version"]) fwd_index["msg_id"] = int(msg_id, 16) index_data = index_data.append(fwd_index) - # already_uploaded[msg_id] = key[0] -# if index_data.empty: -# sys.exit() + already_uploaded[msg_id] = key[0] +if index_data.empty: + sys.exit() for col in ["fwdbpv", "fwdprice", "fwdspread", "ref"]: if col in index_data: index_data[col] = pd.to_numeric(index_data[col]) @@ -72,9 +75,10 @@ index_data["index"] = index_data["index"].astype("category") index_names = ["quotedate", "index", "series", "quote_source"] swaption_stack = pd.concat(swaption_stack, names=index_names, sort=False) dup = swaption_stack.index.duplicated() -# if dup.any(): -# logger.warning("duplicated data") -# swaption_stack = swaption_stack[~dup] +if dup.any(): + logger.warning("duplicated data") + swaption_stack = swaption_stack[~dup] + swaption_stack = swaption_stack.reset_index().set_index( ["quotedate", "index", "series", "expiry", "quote_source", "version"] ) diff --git a/python/quote_parsing/parse_emails.py b/python/quote_parsing/parse_emails.py index ec36973a..eac25f53 100644 --- a/python/quote_parsing/parse_emails.py +++ b/python/quote_parsing/parse_emails.py @@ -132,7 +132,7 @@ def parse_refline(line): return d -def parse_baml(fh, index_desc, *args): +def parse_baml_us(fh, index_desc, *args): option_stack = {} fwd_index = [] line = "" @@ -146,7 +146,7 @@ def parse_baml(fh, index_desc, *args): d = parse_refline(line) d.update(index_desc) df, line = parse_baml_block(fh, index_desc["index"]) - option_stack[d["expiry"]] = df + option_stack[(d["expiry"], index_desc["version"])] = df fwd_index.append(d) else: line = "" @@ -360,7 +360,6 @@ def parse_gs_block(fh, indextype): rec_bid, rec_offer = vals[2].split("/", 1) if rec_offer.count(".") == 2: rec_offer, vol = rec_offer[:6], rec_offer[6:] - print(rec_offer, vol) tail = vals[5] else: vol = vals[3] @@ -488,7 +487,7 @@ def parse_ms_us(fh, index_desc, *args): if block is None or block.empty: logger.warning("MS: block is empty for {expiry} expiry") else: - option_stack[expiry] = block + option_stack[(expiry, index_desc["version"])] = block return option_stack, fwd_index @@ -500,7 +499,7 @@ def parse_nom_us(fh, index_desc, *args): expiry = line.split(" ")[0] expiry = pd.to_datetime(expiry, format="%d-%b-%y") next_line, df = parse_nomura_block(fh, index_desc["index"]) - option_stack[expiry] = df + option_stack[(expiry, index_desc["version"])] = df fwd_index.append({"expiry": expiry, **index_desc}) if next_line: if "EXPIRY" in next_line: @@ -524,7 +523,7 @@ def parse_sg_us(fh, index_desc): line = line.rstrip() if line.startswith("Type"): expiry, df = parse_sg_block(fh, index_desc["index"], expiration_dates) - option_stack[expiry] = df + option_stack[(expiry, index_desc["version"])] = df fwd_index.append({"expiry": expiry, **index_desc}) return option_stack, fwd_index @@ -551,7 +550,10 @@ def parse_gs_us(fh, index_desc): } ) try: - option_stack[expiry], line = parse_gs_block(fh, index_desc["index"]) + ( + option_stack[(expiry, index_desc["version"])], + line, + ) = parse_gs_block(fh, index_desc["index"]) except IndexError as e: logger.debug(traceback.format_exc()) logger.error(f"Something is wrong with file {Path(fh.name).stem}") @@ -594,7 +596,9 @@ def parse_citi_us(fh, index_desc): expiry, ref = m.groups() expiry = pd.to_datetime(expiry, format="%d-%b-%y") fwd_index.append({"ref": ref, "expiry": expiry, **index_desc}) - option_stack[expiry] = parse_citi_block(fh, index_desc["index"]) + option_stack[(expiry, index_desc["version"])] = parse_citi_block( + fh, index_desc["index"] + ) else: logger.error("Can't parse expiry line:", line) return option_stack, fwd_index @@ -616,7 +620,9 @@ def parse_cs_us(fh, index_desc): d = m.groupdict() d["expiry"] = pd.to_datetime(d["expiry"], format="%d-%b-%y") fwd_index.append({**index_desc, **d}) - option_stack[d["expiry"]] = parse_cs_block(fh, index_desc["index"]) + option_stack[(d["expiry"], index_desc["version"])] = parse_cs_block( + fh, index_desc["index"] + ) else: logger.error("Can't parse expiry line:", line, "filename:", fh.name) return option_stack, fwd_index @@ -643,7 +649,7 @@ def parse_bnp_us(fh, index_desc): d for d in expiration_dates if d.month == expiry_month ) fwd_index.append({**index_desc, **d}) - option_stack[d["expiry"]] = parse_bnp_block( + option_stack[(d["expiry"], index_desc["version"])] = parse_bnp_block( fh, index_desc["index"], c == -1 ) else: @@ -654,7 +660,6 @@ def parse_bnp_us(fh, index_desc): def parse_jpm_useu(fh, index_desc): option_stack = {} fwd_index = [] - versions = set() regex = r"JPM (CDX|iTrx) Options: (HY|IG|MAIN|XOVER) \(\w\d+V(?P<version>\d+)\) (?P<expiry>[\d]+-[\w]+-[\d]+) \*\* Fwd @(?P<fwdref>[\d.]+)" pat = re.compile(regex) line = next(fh).strip() @@ -667,7 +672,6 @@ def parse_jpm_useu(fh, index_desc): ) d["expiry"] = pd.to_datetime(d["expiry"], format="%d-%b-%y") index_desc["version"] = d["version"] - versions.add(d["version"]) fwd_index.append({**index_desc, **d}) try: @@ -813,7 +817,14 @@ def write_todb(swaption_stack, index_data, conn): else: try: df = swaption_stack.loc[ - (t.quotedate, t.index, t.series, t.expiry, t.quote_source), + ( + t.quotedate, + t.index, + t.series, + t.expiry, + t.quote_source, + t.version, + ), ] except KeyError as e: logger.warning( |
