aboutsummaryrefslogtreecommitdiffstats
path: root/python/markit
diff options
context:
space:
mode:
Diffstat (limited to 'python/markit')
-rw-r--r--python/markit/__init__.py4
-rw-r--r--python/markit/__main__.py12
-rw-r--r--python/markit/import_quotes.py36
-rw-r--r--python/markit/rates.py8
4 files changed, 33 insertions, 27 deletions
diff --git a/python/markit/__init__.py b/python/markit/__init__.py
index 5bef36e3..113052c7 100644
--- a/python/markit/__init__.py
+++ b/python/markit/__init__.py
@@ -1,2 +1,2 @@
-from utils.db import dbconn, with_connection
-from env import DATA_DIR, BASE_DIR
+from serenitas.utils.db import dbconn, with_connection
+from serenitas.utils.env import DATA_DIR, BASE_DIR
diff --git a/python/markit/__main__.py b/python/markit/__main__.py
index c85ef0bc..5fcaaf30 100644
--- a/python/markit/__main__.py
+++ b/python/markit/__main__.py
@@ -15,9 +15,8 @@ from .loans import (
from .rates import downloadMarkitIRData
from .import_quotes import *
from pandas.tseries.offsets import BDay
-from sqlalchemy import create_engine
-from utils import SerenitasFileHandler
-from utils.db import serenitas_pool
+from serenitas.utils import SerenitasFileHandler
+from serenitas.utils.db import serenitas_pool
# parse arguments
parser = argparse.ArgumentParser()
@@ -116,11 +115,10 @@ elif args.cds:
remove_curves(conn, workdate)
insert_cds(conn, workdate)
copy_curves_forward(conn, workdate)
- serenitas_pool.putconn(conn)
if not args.insert_only:
- engine = create_engine("postgresql://serenitas_user@debian/serenitasdb")
- insert_index(engine, workdate)
- insert_tranche(engine, workdate)
+ insert_index(conn, workdate)
+ insert_tranche(conn, workdate)
+ serenitas_pool.putconn(conn)
elif args.rates:
conn = serenitas_pool.getconn()
diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py
index 1be496d9..fe0d20a0 100644
--- a/python/markit/import_quotes.py
+++ b/python/markit/import_quotes.py
@@ -7,12 +7,13 @@ import pandas as pd
from collections import defaultdict
from dataclasses import dataclass
-from env import BASE_DIR
+from serenitas.utils.env import BASE_DIR
from itertools import chain, product
from pandas.tseries.offsets import BDay
from pyisda.curve import SpreadCurve, DocClause, Seniority
+from psycopg2.extras import execute_values
from typing import Dict, List, Set, Tuple
-from yieldcurve import get_curve
+from serenitas.analytics.yieldcurve import get_curve
logger = logging.getLogger(__name__)
@@ -203,7 +204,11 @@ def insert_cds(database, workdate: datetime.date):
if len(line["DocClause"]) == 2:
line["DocClause"] += "14"
k = CurveKey(
- line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread,
+ line["Ticker"],
+ line["Tier"],
+ line["Ccy"],
+ line["DocClause"],
+ spread,
)
if mappings := markit_bbg_mapping.get(k, False):
if fixed:
@@ -328,10 +333,10 @@ def get_date(f):
return datetime.datetime.strptime(date, "%d-%b-%y").date()
-def insert_index(engine, workdate=None):
+def insert_index(conn, workdate=None):
"""insert Markit index quotes into the database
- :param engine: sqlalchemy engine to the database
+ :param conn: psycopg2 connection
:param workdate: date. If None, we will try to reinsert all files
"""
@@ -386,24 +391,24 @@ def insert_index(engine, workdate=None):
data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3
# data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last()
data["source"] = "MKIT"
- place_holders = ",".join(["%s"] * len(ext_cols))
sql_str = (
f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) "
- f"VALUES({place_holders}) ON CONFLICT DO NOTHING"
+ "VALUES % ON CONFLICT DO NOTHING"
)
- engine.execute(sql_str, list(data[ext_cols].itertuples(index=False)))
+ with conn.cusor() as c:
+ execute_values(c, sql_str, list(data[ext_cols].itertuples(index=False)))
-def insert_tranche(engine, workdate=None):
- """insert Markit index quotes into the database
+def insert_tranche(conn, workdate=None):
+ """insert Markit tranche quotes into the database
- :param engine: sqlalchemy engine to the database
+ :param conn: psycopg2 connection
:param workdate: If None, we will try to reinsert all files
:type workdate: pd.Timestamp
"""
basedir = BASE_DIR / "Tranche_data" / "Composite_reports"
- index_version = pd.read_sql_table("index_version", engine, index_col="redindexcode")
+ index_version = pd.read_sql_table("index_version", conn, index_col="redindexcode")
for f in basedir.glob("Tranche Composites*"):
if (
workdate is None
@@ -446,4 +451,9 @@ def insert_tranche(engine, workdate=None):
"index_price",
]
)
- df.to_sql("markit_tranche_quotes", engine, if_exists="append", index=False)
+ sql_str = (
+ f"INSERT INTO markit_tranche_quotes({','.join(df.columns)}) "
+ "VALUES % ON CONFLICT DO NOTHING"
+ )
+ with conn.cusor() as c:
+ execute_values(c, sql_str, list(df.itertuples(index=False)))
diff --git a/python/markit/rates.py b/python/markit/rates.py
index 433a8a6a..418657d5 100644
--- a/python/markit/rates.py
+++ b/python/markit/rates.py
@@ -1,14 +1,11 @@
-from . import dbconn
import datetime
from env import DATA_DIR
from io import BytesIO
-import lz4
-from pandas.tseries.offsets import BDay
from psycopg2 import sql
import requests
import xml.etree.ElementTree as ET
import zipfile
-from yieldcurve import YC, ql_to_jp
+from serenitas.analytics.yieldcurve import YC, ql_to_jp
def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="USD"):
@@ -46,7 +43,8 @@ def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="US
sql_str = f"INSERT INTO {currency}_curves VALUES(%s, %s) ON CONFLICT DO NOTHING"
with conn.cursor() as c:
c.execute(
- sql_str, (MarkitData["effectiveasof"], jp_yc.__getstate__()),
+ sql_str,
+ (MarkitData["effectiveasof"], jp_yc.__getstate__()),
)
instruments = MarkitData["deposits"] + MarkitData["swaps"]
names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments])