aboutsummaryrefslogtreecommitdiffstats
path: root/python/markit/import_quotes.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/markit/import_quotes.py')
-rw-r--r--python/markit/import_quotes.py36
1 files changed, 23 insertions, 13 deletions
diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py
index 1be496d9..fe0d20a0 100644
--- a/python/markit/import_quotes.py
+++ b/python/markit/import_quotes.py
@@ -7,12 +7,13 @@ import pandas as pd
from collections import defaultdict
from dataclasses import dataclass
-from env import BASE_DIR
+from serenitas.utils.env import BASE_DIR
from itertools import chain, product
from pandas.tseries.offsets import BDay
from pyisda.curve import SpreadCurve, DocClause, Seniority
+from psycopg2.extras import execute_values
from typing import Dict, List, Set, Tuple
-from yieldcurve import get_curve
+from serenitas.analytics.yieldcurve import get_curve
logger = logging.getLogger(__name__)
@@ -203,7 +204,11 @@ def insert_cds(database, workdate: datetime.date):
if len(line["DocClause"]) == 2:
line["DocClause"] += "14"
k = CurveKey(
- line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread,
+ line["Ticker"],
+ line["Tier"],
+ line["Ccy"],
+ line["DocClause"],
+ spread,
)
if mappings := markit_bbg_mapping.get(k, False):
if fixed:
@@ -328,10 +333,10 @@ def get_date(f):
return datetime.datetime.strptime(date, "%d-%b-%y").date()
-def insert_index(engine, workdate=None):
+def insert_index(conn, workdate=None):
"""insert Markit index quotes into the database
- :param engine: sqlalchemy engine to the database
+ :param conn: psycopg2 connection
:param workdate: date. If None, we will try to reinsert all files
"""
@@ -386,24 +391,24 @@ def insert_index(engine, workdate=None):
data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3
# data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last()
data["source"] = "MKIT"
- place_holders = ",".join(["%s"] * len(ext_cols))
sql_str = (
f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) "
- f"VALUES({place_holders}) ON CONFLICT DO NOTHING"
+ "VALUES % ON CONFLICT DO NOTHING"
)
- engine.execute(sql_str, list(data[ext_cols].itertuples(index=False)))
+ with conn.cusor() as c:
+ execute_values(c, sql_str, list(data[ext_cols].itertuples(index=False)))
-def insert_tranche(engine, workdate=None):
- """insert Markit index quotes into the database
+def insert_tranche(conn, workdate=None):
+ """insert Markit tranche quotes into the database
- :param engine: sqlalchemy engine to the database
+ :param conn: psycopg2 connection
:param workdate: If None, we will try to reinsert all files
:type workdate: pd.Timestamp
"""
basedir = BASE_DIR / "Tranche_data" / "Composite_reports"
- index_version = pd.read_sql_table("index_version", engine, index_col="redindexcode")
+ index_version = pd.read_sql_table("index_version", conn, index_col="redindexcode")
for f in basedir.glob("Tranche Composites*"):
if (
workdate is None
@@ -446,4 +451,9 @@ def insert_tranche(engine, workdate=None):
"index_price",
]
)
- df.to_sql("markit_tranche_quotes", engine, if_exists="append", index=False)
+ sql_str = (
+ f"INSERT INTO markit_tranche_quotes({','.join(df.columns)}) "
+ "VALUES % ON CONFLICT DO NOTHING"
+ )
+ with conn.cusor() as c:
+ execute_values(c, sql_str, list(df.itertuples(index=False)))