diff options
Diffstat (limited to 'python/markit')
| -rw-r--r-- | python/markit/__init__.py | 4 | ||||
| -rw-r--r-- | python/markit/__main__.py | 12 | ||||
| -rw-r--r-- | python/markit/import_quotes.py | 36 | ||||
| -rw-r--r-- | python/markit/rates.py | 8 |
4 files changed, 33 insertions, 27 deletions
diff --git a/python/markit/__init__.py b/python/markit/__init__.py index 5bef36e3..113052c7 100644 --- a/python/markit/__init__.py +++ b/python/markit/__init__.py @@ -1,2 +1,2 @@ -from utils.db import dbconn, with_connection -from env import DATA_DIR, BASE_DIR +from serenitas.utils.db import dbconn, with_connection +from serenitas.utils.env import DATA_DIR, BASE_DIR diff --git a/python/markit/__main__.py b/python/markit/__main__.py index c85ef0bc..5fcaaf30 100644 --- a/python/markit/__main__.py +++ b/python/markit/__main__.py @@ -15,9 +15,8 @@ from .loans import ( from .rates import downloadMarkitIRData from .import_quotes import * from pandas.tseries.offsets import BDay -from sqlalchemy import create_engine -from utils import SerenitasFileHandler -from utils.db import serenitas_pool +from serenitas.utils import SerenitasFileHandler +from serenitas.utils.db import serenitas_pool # parse arguments parser = argparse.ArgumentParser() @@ -116,11 +115,10 @@ elif args.cds: remove_curves(conn, workdate) insert_cds(conn, workdate) copy_curves_forward(conn, workdate) - serenitas_pool.putconn(conn) if not args.insert_only: - engine = create_engine("postgresql://serenitas_user@debian/serenitasdb") - insert_index(engine, workdate) - insert_tranche(engine, workdate) + insert_index(conn, workdate) + insert_tranche(conn, workdate) + serenitas_pool.putconn(conn) elif args.rates: conn = serenitas_pool.getconn() diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py index 1be496d9..fe0d20a0 100644 --- a/python/markit/import_quotes.py +++ b/python/markit/import_quotes.py @@ -7,12 +7,13 @@ import pandas as pd from collections import defaultdict from dataclasses import dataclass -from env import BASE_DIR +from serenitas.utils.env import BASE_DIR from itertools import chain, product from pandas.tseries.offsets import BDay from pyisda.curve import SpreadCurve, DocClause, Seniority +from psycopg2.extras import execute_values from typing import Dict, List, Set, Tuple -from yieldcurve import get_curve +from serenitas.analytics.yieldcurve import get_curve logger = logging.getLogger(__name__) @@ -203,7 +204,11 @@ def insert_cds(database, workdate: datetime.date): if len(line["DocClause"]) == 2: line["DocClause"] += "14" k = CurveKey( - line["Ticker"], line["Tier"], line["Ccy"], line["DocClause"], spread, + line["Ticker"], + line["Tier"], + line["Ccy"], + line["DocClause"], + spread, ) if mappings := markit_bbg_mapping.get(k, False): if fixed: @@ -328,10 +333,10 @@ def get_date(f): return datetime.datetime.strptime(date, "%d-%b-%y").date() -def insert_index(engine, workdate=None): +def insert_index(conn, workdate=None): """insert Markit index quotes into the database - :param engine: sqlalchemy engine to the database + :param conn: psycopg2 connection :param workdate: date. If None, we will try to reinsert all files """ @@ -386,24 +391,24 @@ def insert_index(engine, workdate=None): data.loc[data.series.isin([9, 10, 11]) & (data.index == "HY"), "version"] -= 3 # data = data.groupby(['index', 'series', 'tenor', 'date'], as_index=False).last() data["source"] = "MKIT" - place_holders = ",".join(["%s"] * len(ext_cols)) sql_str = ( f"INSERT INTO index_quotes_pre({','.join(ext_cols)}) " - f"VALUES({place_holders}) ON CONFLICT DO NOTHING" + "VALUES % ON CONFLICT DO NOTHING" ) - engine.execute(sql_str, list(data[ext_cols].itertuples(index=False))) + with conn.cusor() as c: + execute_values(c, sql_str, list(data[ext_cols].itertuples(index=False))) -def insert_tranche(engine, workdate=None): - """insert Markit index quotes into the database +def insert_tranche(conn, workdate=None): + """insert Markit tranche quotes into the database - :param engine: sqlalchemy engine to the database + :param conn: psycopg2 connection :param workdate: If None, we will try to reinsert all files :type workdate: pd.Timestamp """ basedir = BASE_DIR / "Tranche_data" / "Composite_reports" - index_version = pd.read_sql_table("index_version", engine, index_col="redindexcode") + index_version = pd.read_sql_table("index_version", conn, index_col="redindexcode") for f in basedir.glob("Tranche Composites*"): if ( workdate is None @@ -446,4 +451,9 @@ def insert_tranche(engine, workdate=None): "index_price", ] ) - df.to_sql("markit_tranche_quotes", engine, if_exists="append", index=False) + sql_str = ( + f"INSERT INTO markit_tranche_quotes({','.join(df.columns)}) " + "VALUES % ON CONFLICT DO NOTHING" + ) + with conn.cusor() as c: + execute_values(c, sql_str, list(df.itertuples(index=False))) diff --git a/python/markit/rates.py b/python/markit/rates.py index 433a8a6a..418657d5 100644 --- a/python/markit/rates.py +++ b/python/markit/rates.py @@ -1,14 +1,11 @@ -from . import dbconn import datetime from env import DATA_DIR from io import BytesIO -import lz4 -from pandas.tseries.offsets import BDay from psycopg2 import sql import requests import xml.etree.ElementTree as ET import zipfile -from yieldcurve import YC, ql_to_jp +from serenitas.analytics.yieldcurve import YC, ql_to_jp def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="USD"): @@ -46,7 +43,8 @@ def downloadMarkitIRData(conn, download_date=datetime.date.today(), currency="US sql_str = f"INSERT INTO {currency}_curves VALUES(%s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: c.execute( - sql_str, (MarkitData["effectiveasof"], jp_yc.__getstate__()), + sql_str, + (MarkitData["effectiveasof"], jp_yc.__getstate__()), ) instruments = MarkitData["deposits"] + MarkitData["swaps"] names = sql.SQL(", ").join([sql.Identifier(r[0]) for r in instruments]) |
