diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/calibrate_tranches.py | 84 | ||||
| m--------- | python/pyisda | 0 | ||||
| -rw-r--r-- | python/script_calibrate_tranches.py | 31 | ||||
| -rw-r--r-- | python/utils/__init__.py | 35 | ||||
| -rw-r--r-- | python/utils/db.py | 132 | ||||
| -rw-r--r-- | python/yieldcurve.py | 415 |
6 files changed, 0 insertions, 697 deletions
diff --git a/python/calibrate_tranches.py b/python/calibrate_tranches.py deleted file mode 100644 index 98a87953..00000000 --- a/python/calibrate_tranches.py +++ /dev/null @@ -1,84 +0,0 @@ -import numpy as np -from tranche_functions import * -from yieldcurve import YC -import yaml -import datetime -import os -import pandas as pd -import pdb - -n_int = 500 -n_credit = 100 -Z, w = GHquad(n_int) - -with open("../R/index_definitions.yml") as fh: - indices = yaml.load(fh, Loader=yaml.FullLoader) -indices["hy21"]["maturity"] = datetime.date(1970, 1, 1) + datetime.timedelta( - indices["hy21"]["maturity"] -) -hy21 = indices["hy21"] -hy21["startdate"] = datetime.date(2013, 9, 20) -dates = [ - f[9:19] - for f in os.listdir(os.path.join(os.environ["DATA_DIR"], "Backtest")) - if "survprob" in f -] - -Rho = np.zeros((len(dates), 3)) -for i, d in enumerate(dates): - startdate = datetime.datetime.strptime(d, "%Y-%m-%d") - ts = YC(startdate) - with open( - os.path.join(os.environ["DATA_DIR"], "Backtest", "recov_{0}.csv".format(d)) - ) as fh: - recov = np.array([float(e) for e in fh], dtype="double", order="F") - - with open( - os.path.join(os.environ["DATA_DIR"], "Backtest", "survprob_{0}.csv".format(d)) - ) as fh: - fh.readline() ##skip header - SurvProb = np.array( - [[float(e) for e in line.split(",")] for line in fh], - dtype="double", - order="F", - ) - - defaultprob = 1 - SurvProb - issuerweights = np.ones(100) / 100 - - rho = 0.4 - Ngrid = 101 - - K = np.array([0, 0.15, 0.25, 0.35, 1]) - - Kmod = adjust_attachments(K, hy21["loss"], hy21["factor"]) - quotes = pd.read_csv( - os.path.join( - os.environ["BASE_DIR"], - "Scenarios", - "Calibration", - "hy21_tranches_{0}.csv".format(d), - ) - ) - quotes = quotes["Mid"] / 100 - dK = np.diff(Kmod) - quotes = np.cumsum(dK * (1 - quotes)) - sched = creditSchedule(startdate, "5Yr", 0.05, ts, enddate=hy21["maturity"]) - acc = cdsAccrued(startdate, 0.05) - for j, q in enumerate(quotes[:-1]): - - def aux(rho): - L, R = BClossdist(defaultprob, issuerweights, recov, rho, Z, w, 101) - cl = tranche_cl(L, R, sched, 0, Kmod[j + 1]) - pl = tranche_pl(L, sched, 0, Kmod[j + 1]) - return cl + pl + q - acc - - l, u = (0, 1) - for _ in range(10): - rho = (l + u) / 2.0 - if aux(rho) > 0: - u = rho - else: - l = rho - Rho[i, j] = (l + u) / 2.0 - print(Rho[i, :]) diff --git a/python/pyisda b/python/pyisda deleted file mode 160000 -Subproject 3b3d53ea09d054c3bd909ae632dded1d3758ea8 diff --git a/python/script_calibrate_tranches.py b/python/script_calibrate_tranches.py deleted file mode 100644 index 6368a74f..00000000 --- a/python/script_calibrate_tranches.py +++ /dev/null @@ -1,31 +0,0 @@ -import numpy as np -from tranche_functions import GHquad, BClossdist - -n_int = 500 -n_credit = 100 -Z, w = GHquad(n_int) - -with open("recov.csv") as fh: - recov = np.array([float(e) for e in fh], dtype='double', order='F') - -with open("SurvProb.csv") as fh: - SurvProb = np.array([[float(e) for e in line.split(",")] for line in fh], dtype='double', order='F') - -defaultprob = 1 - SurvProb -p = defaultprob -rho = 0.45 * np.ones(n_credit) - -for l in range(150): - Rstoch = np.zeros((n_credit, n_int, SurvProb.shape[1])) - for t in range(SurvProb.shape[1]): - for i in range(n_credit): - Rstoch[i,:,t] = stochasticrecov(recov[i], 0, Z, w_mod, rho[i], defaultprob[i,t], p[i,t]) - L = np.zeros((n_int, Ngrid, SurvProb.shape[1])) - R = np.zeros((n_int, Ngrid, SurvProb.shape[1])) - for t in range(SurvProba.shape[1]): - S = 1 - Rstoch[:,:,t] - L[:,:,t] = lossdistZ(p[:,t], issuerweights, S, Ngrid, 0, rho, Z) - R[:,:,t] = lossdistZ(p[:,t], issuerweights, S, Ngrid, 0, rho, Z) - - for i in range(n_int): - result[:,i] = tranche.pvvec(Kmodified, L[i,,], R[i,,], cs) diff --git a/python/utils/__init__.py b/python/utils/__init__.py deleted file mode 100644 index 3e5a0de1..00000000 --- a/python/utils/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -import logging -import logging.handlers -import os - - -class SerenitasFileHandler(logging.FileHandler): - """simple class that encapsulates where we store our logs""" - - _formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - - def __init__(self, log_file): - if "LOG_DIR" not in os.environ: - base_dir = os.path.expanduser("~") - else: - base_dir = os.getenv("LOG_DIR") - super().__init__(filename=os.path.join(base_dir, log_file)) - self.setFormatter(SerenitasFileHandler._formatter) - - -class SerenitasRotatingFileHandler(logging.handlers.RotatingFileHandler): - """simple class that encapsulates where we store our logs""" - - def __init__(self, log_file, maxBytes=0, backupCount=0): - if "LOG_DIR" not in os.environ: - base_dir = os.path.expanduser("~") - else: - base_dir = os.getenv("LOG_DIR") - super().__init__( - filename=os.path.join(base_dir, log_file), - maxBytes=maxBytes, - backupCount=backupCount, - ) - self.setFormatter(SerenitasFileHandler._formatter) diff --git a/python/utils/db.py b/python/utils/db.py deleted file mode 100644 index 242196ab..00000000 --- a/python/utils/db.py +++ /dev/null @@ -1,132 +0,0 @@ -import datetime -import os -import psycopg2 -from psycopg2.extras import DictCursor, NamedTupleCursor -from psycopg2 import IntegrityError, DataError -from psycopg2.extensions import DateFromPy, register_adapter, AsIs -from psycopg2.pool import ThreadedConnectionPool -from sqlalchemy import create_engine -from sqlalchemy.engine.url import URL -import numpy as np -import atexit - - -class InfDateAdapter: - def __init__(self, wrapped): - self.wrapped = wrapped - - def getquoted(self): - if self.wrapped == datetime.date.max: - return b"'infinity'::date" - elif self.wrapped == datetime.date.min: - return b"'-infinity'::date" - else: - return psycopg2.extensions.DateFromPy(self.wrapped).getquoted() - - -def nan_to_null(f, _NULL=AsIs("NULL"), _Float=psycopg2.extensions.Float): - if not np.isnan(f): - return _Float(f) - return _NULL - - -register_adapter(datetime.date, InfDateAdapter) -register_adapter(np.int64, lambda x: AsIs(x)) -register_adapter(np.float, nan_to_null) - - -def dbconn(dbname, cursor_factory=NamedTupleCursor): - if dbname == "etdb": - dbname = "ET" - user_name = "et_user" - else: - user_name = dbname[:-2] + "_user" - return psycopg2.connect( - database=dbname, - user=user_name, - host=os.environ.get("PGHOST", "debian"), - cursor_factory=cursor_factory, - options="-c extra_float_digits=3", - ) - - -def dbengine(dbname, cursor_factory=NamedTupleCursor): - if dbname in ["rmbs_model", "corelogic", "crt"]: - uri = URL( - drivername="mysql+mysqlconnector", - host="debian", - database=dbname, - query={ - "option_files": os.path.expanduser("~/.my.cnf"), - "charset": "utf8mb4", - }, - ) - return create_engine(uri, paramstyle="format") - else: - if dbname == "etdb": - dbname = "ET" - user_name = "et_user" - else: - user_name = dbname[:-2] + "_user" - uri = URL( - drivername="postgresql", - host=os.environ.get("PGHOST", "debian"), - username=user_name, - database=dbname, - query={"options": "-c extra_float_digits=3"}, - ) - return create_engine( - uri, - paramstyle="format", - connect_args={"cursor_factory": cursor_factory}, - isolation_level="AUTOCOMMIT", - ) - - -def with_connection(dbname): - def decorator(f): - conn = dbconn(dbname) - - def with_connection_(*args, **kwargs): - # or use a pool, or a factory function... - try: - rv = f(conn, *args, **kwargs) - except Exception as e: - print(e) - conn.rollback() - else: - return rv - - return with_connection_ - - return decorator - - -def query_db(conn, sqlstr, params=None, one=True): - with conn.cursor() as c: - if params: - c.execute(sqlstr, params) - else: - c.execute(sqlstr) - conn.commit() - r = c.fetchone() if one else c.fetchall() - return r - - -serenitas_pool = ThreadedConnectionPool( - 0, - 5, - database="serenitasdb", - user="serenitas_user", - host=os.environ.get("PGHOST", "debian"), - cursor_factory=NamedTupleCursor, -) - - -@atexit.register -def close_db(): - serenitas_pool.closeall() - - -serenitas_engine = dbengine("serenitasdb") -dawn_engine = dbengine("dawndb") diff --git a/python/yieldcurve.py b/python/yieldcurve.py deleted file mode 100644 index 9fcae54f..00000000 --- a/python/yieldcurve.py +++ /dev/null @@ -1,415 +0,0 @@ -from blist import sorteddict -from collections import namedtuple -from contextlib import closing -from itertools import islice -import datetime -import os -import pandas as pd -from quantlib.settings import Settings -from quantlib.time.api import ( - WeekendsOnly, - Japan, - Date, - Period, - Days, - Schedule, - Annual, - Semiannual, - today, - Actual360, - Months, - Years, - ModifiedFollowing, - Thirty360, - Actual365Fixed, -) -from quantlib.currency.api import USDCurrency, EURCurrency, JPYCurrency -from quantlib.indexes.ibor_index import IborIndex -from quantlib.termstructures.yields.api import ( - PiecewiseYieldCurve, - DepositRateHelper, - SwapRateHelper, - BootstrapTrait, - Interpolator, -) -from quantlib.time.date import pydate_from_qldate - -import numpy as np -from quantlib.quotes import SimpleQuote -from utils.db import dbconn, serenitas_engine -from pyisda.curve import YieldCurve -from pyisda.date import BadDay -import warnings - - -def load_curves(currency="USD", date=None): - """load the prebuilt curves from the database""" - sql_str = f"SELECT * FROM {currency}_curves" - if date: - sql_str += " WHERE effective_date=%s" - - with closing(dbconn("serenitasdb")) as conn: - with conn.cursor() as c: - if date: - c.execute(sql_str, (date,)) - if c: - _, curve = c.fetchone() - return YieldCurve.from_bytes(curve) - else: - c.execute(sql_str) - return sorteddict([(d, YieldCurve.from_bytes(curve)) for d, curve in c]) - - -def get_curve(effective_date, currency="USD"): - if f"_{currency}_curves" in globals(): - curves = globals()[f"_{currency}_curves"] - else: - curves = globals()[f"_{currency}_curves"] = load_curves(currency) - if isinstance(effective_date, datetime.datetime): - effective_date = effective_date.date() - if effective_date > curves.keys()[-1]: - last_curve = curves[curves.keys()[-1]] - return last_curve - if effective_date in curves: - return curves[effective_date] - else: - warnings.warn( - f"cache miss for {currency} curve on {effective_date}", RuntimeWarning - ) - ql_yc = YC(currency=currency, evaluation_date=effective_date) - jp_yc = ql_to_jp(ql_yc) - curves[effective_date] = jp_yc - return jp_yc - - -def getMarkitIRData(effective_date=datetime.date.today(), currency="USD"): - conn = dbconn("serenitasdb") - sql_str = ( - "SELECT * FROM {}_rates WHERE effective_date <= %s " - "ORDER BY effective_date DESC LIMIT 1".format(currency) - ) - with conn.cursor() as c: - c.execute(sql_str, (effective_date,)) - col_names = [col[0] for col in c.description] - r = c.fetchone() - MarkitData = { - "effectiveasof": r[0], - "deposits": [ - (t, rate) for t, rate in zip(col_names[1:7], r[1:7]) if rate is not None - ], - "swaps": [ - (t, rate) for t, rate in zip(col_names[7:], r[7:]) if rate is not None - ], - } - return MarkitData - - -def get_futures_data(date=datetime.date.today()): - futures_file = os.path.join( - os.environ["DATA_DIR"], "Yield Curves", "futures-{0:%Y-%m-%d}.csv".format(date) - ) - with open(futures_file) as fh: - quotes = [float(line.split(",")[1]) for line in fh] - return quotes - - -def get_curve_params(currency): - if currency == "USD": - currency = USDCurrency() - calendar = WeekendsOnly() - fixed_dc = Thirty360() - floating_dc = Actual360() - mm_dc = Actual360() - floating_freq = Period(3, Months) - fixed_freq = Semiannual - elif currency == "EUR": - currency = EURCurrency() - calendar = WeekendsOnly() - fixed_dc = Thirty360() - floating_dc = Actual360() - mm_dc = Actual360() - floating_freq = Period(6, Months) - fixed_freq = Annual - elif currency == "JPY": - currency = JPYCurrency() - calendar = Japan() - fixed_dc = Actual365Fixed() - floating_dc = Actual360() - mm_dc = Actual360() - floating_freq = Period(6, Months) - fixed_freq = Semiannual - CurveParams = namedtuple( - "CurveParam", - "currency, calendar, fixed_dc, floating_dc, " - "mm_dc, floating_freq, fixed_freq", - ) - return CurveParams( - currency, calendar, fixed_dc, floating_dc, mm_dc, floating_freq, fixed_freq - ) - - -def rate_helpers(currency="USD", MarkitData=None, evaluation_date=None): - """Utility function to build a list of RateHelpers - - Parameters - ---------- - currency : str, optional - One of `USD`, `EUR` at the moment, defaults to `USD` - MarkitData : dict, optional - MarkitData for the current evaluation_date - - Returns - ------- - helpers : list - List of QuantLib RateHelpers - """ - settings = Settings() - if evaluation_date is None: - evaluation_date = pydate_from_qldate(settings.evaluation_date) - if isinstance(evaluation_date, pd.Timestamp): - evaluation_date = evaluation_date.date() - if not MarkitData: - MarkitData = getMarkitIRData(evaluation_date, currency) - if MarkitData["effectiveasof"] != evaluation_date: - warnings.warn( - "Yield curve effective date: {0} doesn't " - "match the evaluation date: {1}".format( - MarkitData["effectiveasof"], evaluation_date - ), - RuntimeWarning, - ) - settings.evaluation_date = Date.from_datetime(MarkitData["effectiveasof"]) - params = get_curve_params(currency) - isda_ibor = IborIndex( - "IsdaIbor", - params.floating_freq, - 2, - params.currency, - params.calendar, - ModifiedFollowing, - False, - params.floating_dc, - ) - # we use SimpleQuotes, rather than just float to make it updateable - deps = [ - DepositRateHelper( - SimpleQuote(q), - Period(t), - 2, - params.calendar, - ModifiedFollowing, - False, - params.mm_dc, - ) - for t, q in MarkitData["deposits"] - ] - # this matches with bloomberg, but according to Markit, maturity should be unadjusted - swaps = [ - SwapRateHelper.from_tenor( - SimpleQuote(q), - Period(t), - params.calendar, - params.fixed_freq, - ModifiedFollowing, - params.fixed_dc, - isda_ibor, - ) - for t, q in MarkitData["swaps"] - ] - return deps + swaps - - -def get_dates(date, currency="USD"): - """computes the list of curve dates on a given date""" - if currency == "USD": - month_periods = [1, 2, 3, 6, 12] - year_periods = [2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 20, 25, 30] - calendar = WeekendsOnly() - settle_date = calendar.advance(Date.from_datetime(date), 2, 0) - deposit_dates = [ - calendar.advance( - settle_date, period=Period(m, Months), convention=ModifiedFollowing - ) - for m in month_periods - ] - swap_dates = [ - calendar.advance( - settle_date, period=Period(y, Years), convention=ModifiedFollowing - ) - for y in year_periods - ] - dates = deposit_dates + swap_dates - return [pydate_from_qldate(d) for d in dates] - - -def roll_yc(yc, forward_date): - """returns the expected forward yield cuve on a forward_date""" - dates = [d for d in yc.dates if d >= forward_date] - dfs = np.array([yc.discount_factor(d, forward_date) for d in dates]) - return YieldCurve.from_discount_factors(forward_date, dates, dfs, "ACT/365F") - - -def YC( - helpers=None, - currency="USD", - MarkitData=None, - evaluation_date=None, - fixed=False, - extrapolation=False, -): - calendar = WeekendsOnly() - settings = Settings() - if evaluation_date: - settings.evaluation_date = Date.from_datetime(evaluation_date) - if helpers is None: # might roll back the evaluation date - helpers = rate_helpers(currency, MarkitData, evaluation_date) - - if fixed: - _yc = PiecewiseYieldCurve.from_reference_date( - BootstrapTrait.Discount, - Interpolator.LogLinear, - settings.evaluation_date, - helpers, - Actual365Fixed(), - ) - else: - _yc = PiecewiseYieldCurve( - BootstrapTrait.Discount, - Interpolator.LogLinear, - 0, - calendar, - helpers, - Actual365Fixed(), - ) - if extrapolation: - _yc.extrapolation = True - return _yc - - -def jpYC(effective_date, currency="USD", MarkitData=None): - if MarkitData is None: - markit_data = getMarkitIRData(effective_date, currency) - periods, rates = zip(*markit_data["deposits"]) - periods_swaps, rates_swaps = zip(*markit_data["swaps"]) - types = "M" * len(periods) + "S" * len(periods_swaps) - rates = np.array(rates + rates_swaps) - periods = list(periods + periods_swaps) - if currency == "USD": - fixed_period = "6M" - float_period = "3M" - elif currency == "EUR": - fixed_period = "12M" - float_period = "6M" - return YieldCurve( - effective_date, - types, - periods, - rates, - "ACT/360", - fixed_period, - float_period, - "30/360", - "ACT/360", - BadDay.MODIFIED, - ) - - -def ql_to_jp(ql_yc): - """convert a QuantLib yield curve to a JP's one""" - if ql_yc._trait == BootstrapTrait.Discount: - dfs = np.array(ql_yc.data[1:]) - dates = [pydate_from_qldate(d) for d in ql_yc.dates[1:]] - trade_date = pydate_from_qldate(ql_yc.dates[0]) - return YieldCurve.from_discount_factors(trade_date, dates, dfs, "ACT/365F") - else: - raise RuntimeError("QuantLib curve needs to use Discount trait") - - -def build_curves(currency="USD"): - settings = Settings() - params = get_curve_params(currency) - isda_ibor = IborIndex( - "IsdaIbor", - params.floating_freq, - 2, - params.currency, - params.calendar, - ModifiedFollowing, - False, - params.floating_dc, - ) - rates = pd.read_sql_table( - f"{currency.lower()}_rates", serenitas_engine, index_col="effective_date" - ) - quotes = [SimpleQuote() for c in rates.columns] - gen = zip(quotes, rates.columns) - deps = [ - DepositRateHelper( - q, Period(t), 2, params.calendar, ModifiedFollowing, False, params.mm_dc - ) - for q, t in islice(gen, 6) - ] - swaps = [ - SwapRateHelper.from_tenor( - q, - Period(t), - params.calendar, - params.fixed_freq, - ModifiedFollowing, - params.fixed_dc, - isda_ibor, - ) - for q, t in gen - ] - sql_str = f"INSERT INTO {currency}_curves VALUES(%s, %s) ON CONFLICT DO NOTHING" - conn = serenitas_engine.raw_connection() - for effective_date, curve_data in rates.iterrows(): - print(effective_date) - settings.evaluation_date = Date.from_datetime(effective_date) - for q, val in zip(quotes, curve_data): - q.value = val - valid_deps = [d for d in deps if not np.isnan(d.quote)] - valid_swaps = [s for s in swaps if not np.isnan(s.quote)] - ql_yc = PiecewiseYieldCurve( - BootstrapTrait.Discount, - Interpolator.LogLinear, - 0, - calendar, - valid_deps + valid_swaps, - Actual365Fixed(), - ) - jp_yc = ql_to_jp(ql_yc) - with conn.cursor() as c: - c.execute( - sql_str, (effective_date, lz4.block.compress(jp_yc.__getstate__())) - ) - conn.commit() - conn.close() - - -if __name__ == "__main__": - # evaluation_date = Date(29, 4, 2014) - Settings.instance().evaluation_date = today() - - import matplotlib.pyplot as plt - from quantlib.time.api import calendar_from_name - from pandas.plotting import register_matplotlib_converters - - register_matplotlib_converters() - helpers = rate_helpers("USD") - ts = YC(helpers) - cal = calendar_from_name("USA") - p1 = Period("1M") - p2 = Period("2M") - p3 = Period("3M") - p6 = Period("6M") - p12 = Period("12M") - sched = Schedule.from_rule( - ts.reference_date, ts.reference_date + Period("5Y"), Period("3M"), cal - ) - days = [pydate_from_qldate(d) for d in sched] - f3 = [ts.forward_rate(d, d + p3, Actual360(), 0).rate for d in sched] - f6 = [ts.forward_rate(d, d + p6, Actual360(), 0).rate for d in sched] - f2 = [ts.forward_rate(d, d + p2, Actual360(), 0).rate for d in sched] - - plt.plot(days, f2, days, f3, days, f6) |
