diff options
Diffstat (limited to 'python/markit/import_quotes.py')
| -rw-r--r-- | python/markit/import_quotes.py | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py index 1be496d9..fe0d20a0 100644 --- a/python/markit/import_quotes.py +++ b/python/markit/import_quotes.py @@ -7,12 +7,13 @@ import pandas as pd from collections import defaultdict from dataclasses import dataclass -from env import BASE_DIR +from serenitas.utils.env import BASE_DIR from itertools import chain, product from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve, DocClause, Seniority +from psycopg2.extras import execute_values from typing import Dict, List, Set, Tuple -from yieldcurve import get_curve +from serenitas.analytics.yieldcurve import get_curve logger = logging.getLogger(__name__) @@ -203,7 +204,11 @@ def insert_cds(database, workdate: datetime.date): if len(line["DocClause"]) == 2: line["DocClause"] += "14" k = CurveKey( - line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread, + line["Ticker"], + line["Tier"], + line["Ccy"], + line["DocClause"], + spread, ) if mappings := markit_bbg_mapping.get(k, False): if fixed: @@ -328,10 +333,10 @@ def get_date(f): return datetime.datetime.strptime(date, "%d-%b-%y").date() -def insert_index(engine, workdate=None): +def insert_index(conn, workdate=None): """insert Markit index quotes into the database - :param engine: sqlalchemy engine to the database + :param conn: psycopg2 connection :param workdate: date. If None, we will try to reinsert all files """ @@ -386,24 +391,24 @@ def insert_index(engine, workdate=None): data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3 # data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last() data["source"] = "MKIT" - place_holders = ",".join(["%s"] * len(ext_cols)) sql_str = ( f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) " - f"VALUES({place_holders}) ON CONFLICT DO NOTHING" + "VALUES % ON CONFLICT DO NOTHING" ) - engine.execute(sql_str, list(data[ext_cols].itertuples(index=False))) + with conn.cusor() as c: + execute_values(c, sql_str, list(data[ext_cols].itertuples(index=False))) -def insert_tranche(engine, workdate=None): - """insert Markit index quotes into the database +def insert_tranche(conn, workdate=None): + """insert Markit tranche quotes into the database - :param engine: sqlalchemy engine to the database + :param conn: psycopg2 connection :param workdate: If None, we will try to reinsert all files :type workdate: pd.Timestamp """ basedir = BASE_DIR / "Tranche_data" / "Composite_reports" - index_version = pd.read_sql_table("index_version", engine, index_col="redindexcode") + index_version = pd.read_sql_table("index_version", conn, index_col="redindexcode") for f in basedir.glob("Tranche Composites*"): if ( workdate is None @@ -446,4 +451,9 @@ def insert_tranche(engine, workdate=None): "index_price", ] ) - df.to_sql("markit_tranche_quotes", engine, if_exists="append", index=False) + sql_str = ( + f"INSERT INTO markit_tranche_quotes({','.join(df.columns)}) " + "VALUES % ON CONFLICT DO NOTHING" + ) + with conn.cusor() as c: + execute_values(c, sql_str, list(df.itertuples(index=False))) |
