diff options
Diffstat (limited to 'python/parse_emails.py')
| -rw-r--r-- | python/parse_emails.py | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py index cf7dc88f..d93c17b6 100644 --- a/python/parse_emails.py +++ b/python/parse_emails.py @@ -25,7 +25,7 @@ def list_imm_dates(date): return r def makedf(r, indextype, quote_source): - if indextype=='IG': + if indextype == 'IG': cols = ['strike', 'rec_bid', 'rec_offer', 'delta_rec', 'pay_bid', 'pay_offer', 'delta_pay', 'vol'] else: @@ -38,7 +38,7 @@ def makedf(r, indextype, quote_source): df = pd.DataFrame.from_records(r, columns = cols) for col in ['delta_rec', 'delta_pay', 'vol', 'price_vol', 'gamma', 'tail']: if col in df: - df[col] = df[col].str.strip("%").astype('float')/100 + df[col] = df[col].str.strip("%").astype('float') / 100 if quote_source == "GS": for col in ["pay_bid", "pay_offer", "rec_bid", "rec_offer"]: df[col] = df[col].str.strip('-') @@ -49,7 +49,6 @@ def makedf(r, indextype, quote_source): df[k] = pd.to_numeric(df[k]) except ValueError: pdb.set_trace() - df['quote_source'] = quote_source df.set_index('strike', inplace=True) return df @@ -103,6 +102,7 @@ def parse_baml(fh, indextype, series, quotedate, *args): if option_stack: fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') + fwd_index['quote_source'] = 'BAML' return option_stack, fwd_index else: raise RuntimeError("empty email: " + fh.name) @@ -295,6 +295,7 @@ def parse_gs(fh, indextype, series, quotedate, ref): logging.error("Can't parse expiry line:", line) fwd_index = pd.DataFrame.from_records(fwd_index, index='quotedate') + fwd_index['quote_source'] = 'GS' return option_stack, fwd_index subject_baml = re.compile("(?:Fwd:){0,2}(?:BAML )?(\w{2})([0-9]{1,2})\s") @@ -333,27 +334,38 @@ def parse_email(email, date_received): 'ref': ref, 'index': indextype, 'series': series, - 'expiry': list(option_stack.keys())}) - fwd_index.set_index('quotedate', inplace = True) + 'expiry': list(option_stack.keys()), + 'quote_source': source}) + fwd_index.set_index('quotedate', inplace=True) return (quotedate, indextype, series), (option_stack, fwd_index) else: raise RuntimeError("can't parse subject line: {0} for email {1}".format( subject, email.name)) def write_todb(swaption_stack, index_data): - conn = dbconn('serenitasdb') + def gen_sql_str(query, table_name, columns): + return query.format(sql.Identifier(table_name), + sql.SQL(", ").join(sql.Identifier(c) for c in columns), + sql.SQL(", ").join(sql.Placeholder() * len(columns))) + conn = dbconn('serenitasdb') + query = sql.SQL("INSERT INTO {}({}) VALUES({}) " \ + "ON CONFLICT DO NOTHING RETURNING ref_id") + sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns) query = sql.SQL("INSERT INTO {}({}) VALUES({}) " \ "ON CONFLICT DO NOTHING") - for df, table in zip([index_data, swaption_stack], - ["swaption_ref_quotes", "swaption_quotes"]): - cols = df.columns - sql_str = query.format(sql.Identifier(table), - sql.SQL(", ").join(sql.Identifier(c) for c in cols), - sql.SQL(", ").join(sql.Placeholder() * len(cols))) - with conn.cursor() as c: - c.executemany(sql_str, df.itertuples(index=False)) - conn.commit() - conn.close() + with conn.cursor() as c: + for t in index_data.itertuples(index=False): + c.execute(sql_str, t) + try: + ref_id, = next(c) + except StopIteration: + continue + else: + df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry)] + df['ref_id'] = ref_id + c.executemany(gen_sql_str(query, "swaption_quotes", df.columns), + df.itertuples(index=False)) + conn.commit() def get_email_list(date): """returns a list of email file names for a given date @@ -414,6 +426,7 @@ if __name__=="__main__": swaption_stack = pd.concat(swaption_stack, names=['quotedate', 'index', 'series']) swaption_stack = swaption_stack.reset_index() swaption_stack = swaption_stack.drop_duplicates(['quotedate', 'index', 'series', 'expiry', 'strike']) + swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry']) index_data = index_data.reset_index() index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry']) write_todb(swaption_stack, index_data) |
