aboutsummaryrefslogtreecommitdiffstats
path: root/python/parse_emails.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/parse_emails.py')
-rw-r--r--python/parse_emails.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/python/parse_emails.py b/python/parse_emails.py
index 69d3bebd..4081d39b 100644
--- a/python/parse_emails.py
+++ b/python/parse_emails.py
@@ -1,7 +1,6 @@
import pandas as pd
import re
import os
-from db import dbconn
import psycopg2.sql as sql
from download_emails import save_emails, errors
import datetime
@@ -434,12 +433,11 @@ def parse_email(email, date_received):
else:
raise RuntimeError(f"can't parse subject line: {subject} for email {email.name}")
-def write_todb(swaption_stack, index_data):
+def write_todb(swaption_stack, index_data, conn):
def gen_sql_str(query, table_name, columns):
return query.format(sql.Identifier(table_name),
sql.SQL(", ").join(sql.Identifier(c) for c in columns),
sql.SQL(", ").join(sql.Placeholder() * len(columns)))
- conn = dbconn('serenitasdb')
query = sql.SQL("INSERT INTO {}({}) VALUES({}) "
"ON CONFLICT DO NOTHING RETURNING ref_id")
sql_str = gen_sql_str(query, "swaption_ref_quotes", index_data.columns)
@@ -454,11 +452,13 @@ def write_todb(swaption_stack, index_data):
continue
else:
try:
- df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry)]
+ df = swaption_stack.loc[(t.quotedate, t.index, t.series, t.expiry),]
except KeyError as e:
logging.warning("missing key in swaption_stack: "
f"{t.quotedate}, {t.index}, {t.series}, {t.expiry}")
continue
+ except IndexingError:
+ breakpoint()
df['ref_id'] = ref_id
c.executemany(gen_sql_str(query, "swaption_quotes", df.columns),
df.itertuples(index=False))
@@ -537,6 +537,10 @@ if __name__ == "__main__":
swaption_stack = swaption_stack.set_index(['quotedate', 'index', 'series', 'expiry'])
index_data = index_data.reset_index()
index_data = index_data.drop_duplicates(['quotedate', 'index', 'series', 'expiry'])
- write_todb(swaption_stack, index_data)
+ from db import serenitas_pool
+ conn = serenitas_pool.getconn()
+ write_todb(swaption_stack, index_data, conn)
+ serenitas_pool.putconn(conn)
+
with open(".pickle", "wb") as fh:
pickle.dump(already_uploaded, fh)