diff options
| -rw-r--r-- | python/notebooks/Reto Report.ipynb | 29 | ||||
| -rw-r--r-- | python/risk/__init__.py | 3 | ||||
| -rw-r--r-- | python/risk/__main__.py | 23 | ||||
| -rw-r--r-- | python/risk/bonds.py | 170 | ||||
| -rw-r--r-- | python/utils/db.py | 88 |
5 files changed, 263 insertions, 50 deletions
diff --git a/python/notebooks/Reto Report.ipynb b/python/notebooks/Reto Report.ipynb index 63e5ab48..1b8211f4 100644 --- a/python/notebooks/Reto Report.ipynb +++ b/python/notebooks/Reto Report.ipynb @@ -16,7 +16,13 @@ "from analytics.index_data import get_index_quotes\n", "from analytics.scenarios import run_portfolio_scenarios\n", "from analytics import BlackSwaption, CreditIndex, BlackSwaptionVolSurface, Portfolio,DualCorrTranche\n", - "from db import dawn_engine" + "\n", + "from utils.db import dbconn, dbengine\n", + "\n", + "from risk.tranches import get_tranche_portfolio\n", + "from risk.bonds import subprime_risk, clo_risk, crt_risk\n", + "\n", + "dawn_engine = dbengine('dawndb')" ] }, { @@ -132,7 +138,6 @@ "outputs": [], "source": [ "#tranche positions\n", - "from risk.tranches import get_tranche_portfolio\n", "conn = dawn_engine.raw_connection()\n", "portf = get_tranche_portfolio(position_date, conn, False, 'SERCGMAST')\n", "\n", @@ -165,13 +170,19 @@ "portf.add_trade(CreditIndex('HY', on_the_run('HY', spread_date), '5yr', \n", " value_date=spread_date, \n", " notional=curve_portf.hy_equiv), ('curve_trades', ''))\n", - " \n", + "\n", + "mysql_engine = dbengine('rmbs_model')\n", + "mysqlcrt_engine = dbengine('crt')\n", + "\n", "#get bond risks:\n", - "rmbs_pos = go.rmbs_pos(position_date)\n", - "clo_pos = go.clo_pos(position_date)\n", + "with dbconn('etdb') as etconn, dbconn('dawndb') as dawnconn:\n", + " rmbs_pos = subprime_risk(position_date, dawnconn, mysql_engine)\n", + " clo_pos = clo_risk(position_date, dawnconn, etconn)\n", + " crt_pos = crt_risk(position_date, dawnconn, mysqlcrt_engine)\n", "duration = analytics._ontr.risky_annuity\n", "rmbs_pos['hy_equiv'] = rmbs_pos['delta_yield']/duration * 100\n", - "notional = rmbs_pos['hy_equiv'].sum() + clo_pos['hy_equiv'].sum()\n", + "crt_pos['hy_equiv'] = crt_pos['delta_yield']/duration * 100\n", + "notional = rmbs_pos['hy_equiv'].sum() + clo_pos['hy_equiv'].sum() + crt_pos['hy_equiv'].sum()\n", "portf.add_trade(CreditIndex('HY', on_the_run('HY', spread_date), '5yr', \n", " value_date = spread_date, \n", " notional = -notional), ('bonds', ''))\n", @@ -219,10 +230,10 @@ " corr_shock=[0],\n", " vol_surface=vol_surface)\n", "\n", - "scens = scens.xs('pnl', axis=1, level=2)\n", - "scens = scens.xs((0,0), level=['vol_shock', 'corr_shock'])\n", + "pnl = scens.xs('pnl', axis=1, level=2)\n", + "pnl = pnl.xs((0,0), level=['vol_shock', 'corr_shock'])\n", "\n", - "scenarios = (scens.\n", + "scenarios = (pnl.\n", " reset_index(level=['date'], drop=True).\n", " groupby(level=0, axis=1).sum())\n", "\n", diff --git a/python/risk/__init__.py b/python/risk/__init__.py index cf6570a4..49c8342c 100644 --- a/python/risk/__init__.py +++ b/python/risk/__init__.py @@ -1,6 +1,5 @@ import sys + sys.path.append("..") from utils.db import dbconn, dbengine from utils import SerenitasFileHandler - -mysql_engine = dbengine('rmbs_model') diff --git a/python/risk/__main__.py b/python/risk/__main__.py index daf4ab27..d2edbd95 100644 --- a/python/risk/__main__.py +++ b/python/risk/__main__.py @@ -4,33 +4,42 @@ import os import pandas as pd from . import dbconn, dbengine from pandas.tseries.offsets import BDay -from .subprime import get_rmbs_portfolio, subprime_risk +from .bonds import subprime_risk, clo_risk, crt_risk from .swaptions import get_swaption_portfolio, insert_swaption_portfolio from .tranches import get_tranche_portfolio, insert_tranche_portfolio from . import SerenitasFileHandler parser = argparse.ArgumentParser() -parser.add_argument('workdate', nargs='?', - type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date()), +parser.add_argument( + "workdate", nargs="?", type=lambda s: pd.datetime.strptime(s, "%Y-%m-%d").date() +), args = parser.parse_args() if args.workdate is None: - workdate = (pd.Timestamp.today()-BDay()).date() + workdate = (pd.Timestamp.today() - BDay()).date() else: workdate = args.workdate fh = SerenitasFileHandler("risk.log") -loggers = [logging.getLogger('analytics'), logging.getLogger('risk')] +loggers = [logging.getLogger("analytics"), logging.getLogger("risk")] for logger in loggers: logger.setLevel(logging.INFO) logger.addHandler(fh) -with dbconn('dawndb') as conn: +mysql_engine = dbengine("rmbs_model") +mysqlcrt_engine = dbengine("crt") + +with dbconn("dawndb") as conn: portf = get_swaption_portfolio(workdate, conn, source_list=["GS"]) insert_swaption_portfolio(portf, conn) portf = get_tranche_portfolio(workdate, conn) insert_tranche_portfolio(portf, conn) - #portf = get_rmbs_portfolio(workdate, conn) + +with dbconn("etdb") as etconn, dbconn("dawndb") as dawnconn: + subprime = subprime_risk(workdate, dawnconn, mysql_engine) + clo = clo_risk(workdate, dawnconn, etconn) + crt = crt_risk(workdate, dawnconn, mysqlcrt_engine) + # portf = get_rmbs_portfolio(workdate, conn) # crt_portf = portf[portf.strategy.str.contains("CRT")] # subprime_portf = portf[~portf.strategy.str.contains("CRT")] # subprime_portf_zero = subprime_portf[subprime_portf.identifier.str.endswith("_A")] diff --git a/python/risk/bonds.py b/python/risk/bonds.py new file mode 100644 index 00000000..63ecb255 --- /dev/null +++ b/python/risk/bonds.py @@ -0,0 +1,170 @@ +import pandas as pd +import numpy as np + +from utils.db import dbengine +from yieldcurve import YC +from quantlib.termstructures.yield_term_structure import YieldTermStructure + + +def latest_sim(date, engine): + sql_string = ( + "SELECT model_id_sub FROM model_versions " + "JOIN model_versions_nonagency USING (model_id_sub) " + "JOIN simulations_nonagency USING (simulation_id) " + "WHERE (date(start_time) <= %s) AND (description = 'normal') " + "ORDER BY start_time DESC" + ) + conn = engine.raw_connection() + c = conn.cursor() + c.execute(sql_string, (date,)) + model_id_sub, = next(c) + c.close() + return model_id_sub + + +def get_df(date, engine): + model_id_sub = latest_sim(date, engine) + df_prices = pd.read_sql_query( + "SELECT cusip, model_version, pv, modDur, delta_yield, " + "wal, pv_io, pv_po, pv_RnW, delta_ir_io, delta_ir_po, " + "delta_hpi, delta_RnW, delta_mult, delta_ir, pv_FB " + "FROM priced WHERE " + "timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " + "AND model_id_sub=%s " + "AND normalization='current_notional'", + engine, + ["cusip", "model_version"], + params=(date, date, model_id_sub), + ) + df_percentiles = pd.read_sql_query( + "SELECT cusip, PV, percentile " + "FROM priced_percentiles WHERE " + "timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " + "AND model_version=3 " + "AND model_id_sub=%s " + "AND percentile IN (5, 25, 50, 75, 95) " + "AND normalization='current_notional'", + engine, + ["cusip", "percentile"], + params=(date, date, model_id_sub), + ) + df_prices = df_prices.unstack("model_version") + df_percentiles = df_percentiles.unstack("percentile") + return df_prices.join(df_percentiles, how="left") + + +def subprime_risk(date, conn, engine): + df = get_df(date, engine) + df_pos = get_portfolio(date, conn, "Subprime") + df_pv = df.xs("pv", axis=1, level=0) + df_pv.columns = ["pv1", "pv2", "pv3"] + df_pv_perct = df.xs("PV", axis=1, level=0) + df_pv_perct.columns = ["pv5", "pv25", "pv50", "pv75", "pv95"] + df_modDur = df[("modDur", 1)] + df_modDur.name = "modDur" + df_v1 = df.xs(1, axis=1, level="model_version")[ + ["pv_RnW", "delta_mult", "delta_hpi", "delta_ir"] + ] + df_v1.columns = ["v1pv_RnW", "v1_lsdel", "v1_hpidel", "v1_irdel"] + df_pv_FB = df[("pv_FB", 3)] + df_pv_FB.name = "pv_FB" + df_risk = pd.concat( + [ + df_pv, + df_modDur, + df_pv_perct, + df.xs(3, axis=1, level="model_version")[ + [ + "delta_yield", + "wal", + "pv_io", + "pv_po", + "pv_RnW", + "delta_ir_io", + "delta_ir_po", + "delta_hpi", + "delta_RnW", + "delta_mult", + ] + ], + df_v1, + df_pv_FB, + ], + axis=1, + ) + + df_calc = df_pos.join(df_risk) + df_calc = df_calc[~df_calc["strategy"].str.contains("CRT")].dropna() + + yc = YC(evaluation_date=date) + + df_calc = df_calc.assign( + b_yield=df_calc.modDur.apply(lambda x: float(yc.zero_rate(x))), + delta_ir=df_calc.delta_ir_io + df_calc.delta_ir_po, + curr_ntl=df_calc.notional * df_calc.factor, + ) + + df_calc.b_yield += np.minimum( + (df_calc.pv1 * df_calc.curr_ntl / df_calc.local_market_value) + ** (1 / df_calc.modDur) + - 1, + 1, + ).dropna() + df_calc.delta_yield *= df_calc.local_market_value / df_calc.pv3 + df_calc.delta_ir *= ( + (df_calc.local_market_value / df_calc.curr_ntl) / df_calc.pv3 * df_calc.curr_ntl + ) + return df_calc + + +def get_portfolio(date, conn, asset_class, fund="SERCGMAST"): + df = pd.read_sql_query( + "SELECT * FROM risk_positions(%s, %s, %s)", + conn, + params=(date, asset_class, fund), + ) + df["cusip"] = df.identifier.str.slice(0, 9) + df = df.set_index("cusip") + return df + + +def crt_risk(date, conn, engine): + df = get_portfolio(date, conn, "Subprime") + df = df[df["strategy"].str.contains("CRT")].dropna() + df_model = pd.read_sql_query( + "SELECT * from priced_at_market where " + "timestamp BETWEEN %s AND date_add(%s, INTERVAL 1 DAY) " + "and model_des = 'hpi3_ir3'", + engine, + "cusip", + params=(date, date), + ) + df = df.join(df_model) + df["curr_ntl"] = df["notional"] * df["factor"] + df["delta_yield"] = df["curr_ntl"] * df["duration_FW"] + return df + + +def clo_risk(date, conn, conn_1): + df = get_portfolio(date, conn, "CLO") + + sql_string = ( + "select distinct cusip, identifier from bonds where asset_class = 'CLO'" + ) + cur = conn.cursor() + cur.execute(sql_string) + cusip_map = {identifier: cusip for cusip, identifier in cur.fetchall()} + df["cusip"] = df["identifier"].replace(cusip_map) + placeholders = ",".join(["%s"] * (1 + len(df))) + sql_string = f"SELECT * FROM historical_cusip_risk({placeholders})" + model = pd.read_sql_query( + sql_string, + conn_1, + parse_dates=["pricingdate"], + params=[date, *df["cusip"].tolist()], + ) + model.index = df["cusip"] + df = df.join(model, lsuffix="mark") + df["curr_ntl"] = df["notional"] * df["factor"] + df["hy_equiv"] = df["curr_ntl"] * df["delta"] + return df diff --git a/python/utils/db.py b/python/utils/db.py index 8ede3756..fcc3ab0d 100644 --- a/python/utils/db.py +++ b/python/utils/db.py @@ -10,6 +10,7 @@ from sqlalchemy.engine.url import URL import numpy as np import atexit + class InfDateAdapter: def __init__(self, wrapped): self.wrapped = wrapped @@ -23,51 +24,63 @@ class InfDateAdapter: return psycopg2.extensions.DateFromPy(self.wrapped).getquoted() -def nan_to_null(f, _NULL=AsIs('NULL'), - _Float=psycopg2.extensions.Float): +def nan_to_null(f, _NULL=AsIs("NULL"), _Float=psycopg2.extensions.Float): if not np.isnan(f): return _Float(f) return _NULL + register_adapter(datetime.date, InfDateAdapter) register_adapter(np.int64, lambda x: AsIs(x)) register_adapter(np.float, nan_to_null) + def dbconn(dbname, cursor_factory=NamedTupleCursor): - if dbname == 'etdb': - dbname = 'ET' - user_name = 'et_user' + if dbname == "etdb": + dbname = "ET" + user_name = "et_user" else: - user_name = dbname[:-2] + '_user' - return psycopg2.connect(database=dbname, - user=user_name, - host=os.environ.get("PGHOST", "debian"), - cursor_factory=cursor_factory, - options="-c extra_float_digits=3") + user_name = dbname[:-2] + "_user" + return psycopg2.connect( + database=dbname, + user=user_name, + host=os.environ.get("PGHOST", "debian"), + cursor_factory=cursor_factory, + options="-c extra_float_digits=3", + ) + def dbengine(dbname, cursor_factory=NamedTupleCursor): - if dbname in ['rmbs_model', 'corelogic']: - uri = URL(drivername="mysql+mysqlconnector", - host="debian", database=dbname, - query={'option_files': os.path.expanduser('~/.my.cnf')}) + if dbname in ["rmbs_model", "corelogic", "crt"]: + uri = URL( + drivername="mysql+mysqlconnector", + host="debian", + database=dbname, + query={"option_files": os.path.expanduser("~/.my.cnf")}, + ) return create_engine(uri, paramstyle="format") else: - if dbname == 'etdb': - dbname= 'ET' - user_name = 'et_user' + if dbname == "etdb": + dbname = "ET" + user_name = "et_user" else: - user_name = dbname[:-2] + '_user' - uri = URL(drivername="postgresql", - host=os.environ.get("PGHOST", "debian"), - username=user_name, - database=dbname, - query={"options": "-c extra_float_digits=3"}) - return create_engine(uri, paramstyle="format", - connect_args={'cursor_factory': cursor_factory}) + user_name = dbname[:-2] + "_user" + uri = URL( + drivername="postgresql", + host=os.environ.get("PGHOST", "debian"), + username=user_name, + database=dbname, + query={"options": "-c extra_float_digits=3"}, + ) + return create_engine( + uri, paramstyle="format", connect_args={"cursor_factory": cursor_factory} + ) + def with_connection(dbname): def decorator(f): conn = dbconn(dbname) + def with_connection_(*args, **kwargs): # or use a pool, or a factory function... try: @@ -77,9 +90,12 @@ def with_connection(dbname): conn.rollback() else: return rv + return with_connection_ + return decorator + def query_db(conn, sqlstr, params=None, one=True): with conn.cursor() as c: if params: @@ -90,13 +106,21 @@ def query_db(conn, sqlstr, params=None, one=True): r = c.fetchone() if one else c.fetchall() return r -serenitas_pool = ThreadedConnectionPool(0, 5, database='serenitasdb', - user='serenitas_user', - host=os.environ.get("PGHOST", "debian"), - cursor_factory=DictCursor) + +serenitas_pool = ThreadedConnectionPool( + 0, + 5, + database="serenitasdb", + user="serenitas_user", + host=os.environ.get("PGHOST", "debian"), + cursor_factory=DictCursor, +) + + @atexit.register def close_db(): serenitas_pool.closeall() -serenitas_engine = dbengine('serenitasdb') -dawn_engine = dbengine('dawndb') + +serenitas_engine = dbengine("serenitasdb") +dawn_engine = dbengine("dawndb") |
