aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/calibrate_tranches.py60
-rw-r--r--python/load_cf.py206
2 files changed, 186 insertions, 80 deletions
diff --git a/python/calibrate_tranches.py b/python/calibrate_tranches.py
index c9e64e5f..98a87953 100644
--- a/python/calibrate_tranches.py
+++ b/python/calibrate_tranches.py
@@ -13,24 +13,38 @@ Z, w = GHquad(n_int)
with open("../R/index_definitions.yml") as fh:
indices = yaml.load(fh, Loader=yaml.FullLoader)
-indices['hy21']['maturity'] = datetime.date(1970, 1, 1) + datetime.timedelta(indices['hy21']['maturity'])
-hy21 = indices['hy21']
+indices["hy21"]["maturity"] = datetime.date(1970, 1, 1) + datetime.timedelta(
+ indices["hy21"]["maturity"]
+)
+hy21 = indices["hy21"]
hy21["startdate"] = datetime.date(2013, 9, 20)
-dates = [f[9:19] for f in os.listdir(os.path.join(os.environ['DATA_DIR'], "Backtest")) if "survprob" in f]
+dates = [
+ f[9:19]
+ for f in os.listdir(os.path.join(os.environ["DATA_DIR"], "Backtest"))
+ if "survprob" in f
+]
Rho = np.zeros((len(dates), 3))
for i, d in enumerate(dates):
startdate = datetime.datetime.strptime(d, "%Y-%m-%d")
ts = YC(startdate)
- with open(os.path.join(os.environ['DATA_DIR'], "Backtest", "recov_{0}.csv".format(d))) as fh:
- recov = np.array([float(e) for e in fh], dtype='double', order='F')
+ with open(
+ os.path.join(os.environ["DATA_DIR"], "Backtest", "recov_{0}.csv".format(d))
+ ) as fh:
+ recov = np.array([float(e) for e in fh], dtype="double", order="F")
- with open(os.path.join(os.environ['DATA_DIR'], "Backtest", "survprob_{0}.csv".format(d))) as fh:
- fh.readline() ##skip header
- SurvProb = np.array([[float(e) for e in line.split(",")] for line in fh], dtype='double', order='F')
+ with open(
+ os.path.join(os.environ["DATA_DIR"], "Backtest", "survprob_{0}.csv".format(d))
+ ) as fh:
+ fh.readline() ##skip header
+ SurvProb = np.array(
+ [[float(e) for e in line.split(",")] for line in fh],
+ dtype="double",
+ order="F",
+ )
defaultprob = 1 - SurvProb
- issuerweights = np.ones(100)/100
+ issuerweights = np.ones(100) / 100
rho = 0.4
Ngrid = 101
@@ -38,25 +52,33 @@ for i, d in enumerate(dates):
K = np.array([0, 0.15, 0.25, 0.35, 1])
Kmod = adjust_attachments(K, hy21["loss"], hy21["factor"])
- quotes = pd.read_csv(os.path.join(os.environ['BASE_DIR'], "Scenarios", "Calibration",
- "hy21_tranches_{0}.csv".format(d)))
- quotes = quotes["Mid"]/100
+ quotes = pd.read_csv(
+ os.path.join(
+ os.environ["BASE_DIR"],
+ "Scenarios",
+ "Calibration",
+ "hy21_tranches_{0}.csv".format(d),
+ )
+ )
+ quotes = quotes["Mid"] / 100
dK = np.diff(Kmod)
- quotes = np.cumsum(dK * (1-quotes))
+ quotes = np.cumsum(dK * (1 - quotes))
sched = creditSchedule(startdate, "5Yr", 0.05, ts, enddate=hy21["maturity"])
acc = cdsAccrued(startdate, 0.05)
for j, q in enumerate(quotes[:-1]):
+
def aux(rho):
L, R = BClossdist(defaultprob, issuerweights, recov, rho, Z, w, 101)
- cl = tranche_cl(L, R, sched, 0, Kmod[j+1])
- pl = tranche_pl(L, sched, 0, Kmod[j+1])
- return cl+pl+q-acc
+ cl = tranche_cl(L, R, sched, 0, Kmod[j + 1])
+ pl = tranche_pl(L, sched, 0, Kmod[j + 1])
+ return cl + pl + q - acc
+
l, u = (0, 1)
for _ in range(10):
- rho = (l+u)/2.
+ rho = (l + u) / 2.0
if aux(rho) > 0:
u = rho
else:
l = rho
- Rho[i, j] = (l+u)/2.
- print(Rho[i,:])
+ Rho[i, j] = (l + u) / 2.0
+ print(Rho[i, :])
diff --git a/python/load_cf.py b/python/load_cf.py
index 3f4949b9..e198b7e7 100644
--- a/python/load_cf.py
+++ b/python/load_cf.py
@@ -9,7 +9,16 @@ from zipfile import ZipFile
import re, datetime, yaml
from db import query_db, conn
-from quantlib.time.api import Schedule, Actual360, Period, Months, Calendar, Unadjusted, ModifiedFollowing, Date
+from quantlib.time.api import (
+ Schedule,
+ Actual360,
+ Period,
+ Months,
+ Calendar,
+ Unadjusted,
+ ModifiedFollowing,
+ Date,
+)
from quantlib.util.converter import pydate_to_qldate, qldate_to_pydate
from quantlib.settings import Settings
from yieldcurve import YC
@@ -17,11 +26,12 @@ from optimization import KLfit
import rpy2.robjects as ro
import sys
+
def sanitize_float(string):
try:
- string = string.replace(",","")
+ string = string.replace(",", "")
if "(" in string:
- return - float(string[1:-1])
+ return -float(string[1:-1])
else:
return float(string)
except AttributeError:
@@ -29,9 +39,12 @@ def sanitize_float(string):
def processzipfiles(tradedate=datetime.date.today()):
- pricesdir = os.path.join(os.environ['BASE_DIR'], "Scenarios", "Prices_" + str(tradedate))
- zipfiles = [os.path.join(pricesdir, f) for f in os.listdir(pricesdir) \
- if f.endswith(".zip")]
+ pricesdir = os.path.join(
+ os.environ["BASE_DIR"], "Scenarios", "Prices_" + str(tradedate)
+ )
+ zipfiles = [
+ os.path.join(pricesdir, f) for f in os.listdir(pricesdir) if f.endswith(".zip")
+ ]
zipfiles = sorted(zipfiles, key=lambda x: os.stat(x).st_ctime)
cusips_dict = {}
dealnames_dict = {}
@@ -39,18 +52,28 @@ def processzipfiles(tradedate=datetime.date.today()):
with ZipFile(zf) as myzip:
allfiles = set([f.filename.split("-")[0] for f in myzip.infolist()])
allfiles = allfiles - {"Total"}
- dealnames = set([f.filename.split("-")[0] for f in myzip.infolist() \
- if re.search("COLLAT.*Scen100", f.filename)])
+ dealnames = set(
+ [
+ f.filename.split("-")[0]
+ for f in myzip.infolist()
+ if re.search("COLLAT.*Scen100", f.filename)
+ ]
+ )
cusips = allfiles - dealnames
dealnames = [d.lower() for d in dealnames]
cusips_dict.update((cusip, i) for cusip in cusips)
dealnames_dict.update((dealname, i) for dealname in dealnames)
- return {"dealnames": dealnames_dict, "cusips":cusips_dict, "zipfiles": zipfiles}
+ return {"dealnames": dealnames_dict, "cusips": cusips_dict, "zipfiles": zipfiles}
def get_configfile(dealname, tradedate):
- configfile = os.path.join(os.environ['BASE_DIR'], "Scenarios", "Intex curves_" + str(tradedate),
- "csv", dealname + ".config")
+ configfile = os.path.join(
+ os.environ["BASE_DIR"],
+ "Scenarios",
+ "Intex curves_" + str(tradedate),
+ "csv",
+ dealname + ".config",
+ )
try:
with open(configfile) as fh:
config = yaml.load(fh, Loader=yaml.FullLoader)
@@ -58,31 +81,53 @@ def get_configfile(dealname, tradedate):
config = {"reinvflag": True}
return config
+
def get_dist(date):
- distfile = os.path.join(os.environ['BASE_DIR'], "Scenarios", "Calibration",
- "marketdata-{0:%Y-%m-%d}.RData".format(date))
+ distfile = os.path.join(
+ os.environ["BASE_DIR"],
+ "Scenarios",
+ "Calibration",
+ "marketdata-{0:%Y-%m-%d}.RData".format(date),
+ )
dist = ro.r.load(distfile)
- return {"L": np.array(dist[0]),"R": np.array(dist[1])}
+ return {"L": np.array(dist[0]), "R": np.array(dist[1])}
+
def get_dealdata(dealname, tradedate):
- sqlstr = "select \"Curr Collat Bal\" AS currbal, reinv_end_date, first_pay_date," \
- "maturity, \"Principal Bal\", pay_day from historical_clo_universe(%s, %s)"
+ sqlstr = (
+ 'select "Curr Collat Bal" AS currbal, reinv_end_date, first_pay_date,'
+ 'maturity, "Principal Bal", pay_day from historical_clo_universe(%s, %s)'
+ )
data = {k: v for k, v in query_db(sqlstr, (dealname, tradedate)).items()}
- data["mv"] = query_db("select marketvalue from latest_deal_model_numbers where dealname = %s",
- (dealname,))[0]
+ data["mv"] = query_db(
+ "select marketvalue from latest_deal_model_numbers where dealname = %s",
+ (dealname,),
+ )[0]
return data
+
def get_cusipdata(cusip, tradedate):
sqlstring = "select curr_balance, spread from historical_cusip_universe(%s, %s)"
sqldata = query_db(sqlstr, (cusip, str(tradedate)))
return sqldata
-def get_dealschedule(dealdata, freq='1Mo', adj=Unadjusted):
- us_cal = Calendar.from_name('USA')
- if not dealdata["Pay Day"] or abs((dealdata["Deal Next Pay Date"]- dealdata["Pay Day"]).days - 90)>10:
+
+def get_dealschedule(dealdata, freq="1Mo", adj=Unadjusted):
+ us_cal = Calendar.from_name("USA")
+ if (
+ not dealdata["Pay Day"]
+ or abs((dealdata["Deal Next Pay Date"] - dealdata["Pay Day"]).days - 90) > 10
+ ):
dealdata["Pay Day"] = dealdata["Deal Next Pay Date"] - DateOffset(months=3)
- return Schedule(pydate_to_qldate(dealdata["Pay Day"]), pydate_to_qldate(dealdata["maturity"]),
- Period(freq), us_cal, adj, adj)
+ return Schedule(
+ pydate_to_qldate(dealdata["Pay Day"]),
+ pydate_to_qldate(dealdata["maturity"]),
+ Period(freq),
+ us_cal,
+ adj,
+ adj,
+ )
+
def dealname_from_cusip(conn, cusips):
with conn.cursor() as c:
@@ -90,21 +135,25 @@ def dealname_from_cusip(conn, cusips):
dealnames = [d[0] for d in c]
return dealnames
+
def discounts(tradedate):
- calibration_date = tradedate-BDay(1)
+ calibration_date = tradedate - BDay(1)
m = YC(calibration_date)
- alldates = pd.date_range(pd.to_datetime(m.reference_date, format='%d/%m/%Y'),
- pd.to_datetime(m.max_date, format='%d/%m/%Y'))
+ alldates = pd.date_range(
+ pd.to_datetime(m.reference_date, format="%d/%m/%Y"),
+ pd.to_datetime(m.max_date, format="%d/%m/%Y"),
+ )
alldates_ql = [Date(d.day, d.month, d.year) for d in alldates]
df = [m.discount(day) for day in alldates_ql]
- yearfrac = alldates.to_series()-alldates[0]
- yearfrac = yearfrac.astype('timedelta64[D]')/365
- return pd.DataFrame({'yearfrac':yearfrac, 'df': df}, index=alldates)
+ yearfrac = alldates.to_series() - alldates[0]
+ yearfrac = yearfrac.astype("timedelta64[D]") / 365
+ return pd.DataFrame({"yearfrac": yearfrac, "df": df}, index=alldates)
+
-def getdealcf(dealnames, zipfiles, tradedate = datetime.date.today()):
+def getdealcf(dealnames, zipfiles, tradedate=datetime.date.today()):
n_scenarios = 100
cfdata = {}
- discounts_table= discounts(tradedate)
+ discounts_table = discounts(tradedate)
fields = ["Cashflow", "Principal", "Interest"]
for dealname, i in dealnames.items():
zipfile = zipfiles[i]
@@ -112,42 +161,59 @@ def getdealcf(dealnames, zipfiles, tradedate = datetime.date.today()):
cfdata[dealname] = {k: dealdata[k] for k in ["mv", "currbal"]}
config = get_configfile(dealname, tradedate)
- if (not dealdata["reinv_end_date"] or not config["reinvflag"]):
- tranches = ["COLLAT"]
+ if not dealdata["reinv_end_date"] or not config["reinvflag"]:
+ tranches = ["COLLAT"]
else:
tranches = ["COLLAT_INITIAL", "COLLAT_REINVEST"]
cf = {}
with ZipFile(zipfile) as myzip:
for tranche in tranches:
- scen = np.zeros((100,3))
+ scen = np.zeros((100, 3))
for j in range(n_scenarios):
- filename = "{0}-{1}-CF-Scen{2}.txt".format(dealname.upper(), tranche, j+1)
- data = pd.read_table(myzip.open(filename), skiprows=[1, 2], parse_dates=[0],
- thousands=",",
- date_parser = lambda x: datetime.datetime.strptime(x, "%b %d, %Y"),
- index_col=0)
+ filename = "{0}-{1}-CF-Scen{2}.txt".format(
+ dealname.upper(), tranche, j + 1
+ )
+ data = pd.read_table(
+ myzip.open(filename),
+ skiprows=[1, 2],
+ parse_dates=[0],
+ thousands=",",
+ date_parser=lambda x: datetime.datetime.strptime(
+ x, "%b %d, %Y"
+ ),
+ index_col=0,
+ )
for c in fields:
- if data.dtypes[c] != np.dtype('float64'):
+ if data.dtypes[c] != np.dtype("float64"):
data[c] = data[c].apply(sanitize_float)
data = data[fields].join(discounts_table)
- scen[j,:] = np.dot(data.df, data[fields])
+ scen[j, :] = np.dot(data.df, data[fields])
cf[tranche] = pd.DataFrame(scen, columns=fields)
cfdata[dealname]["panel"] = pd.concat(cf)
cf = cfdata[dealname]["panel"].Cashflow.sum(level=1)
- cfdata[dealname]["wapbasis"] = (cf.mean() - dealdata["mv"])/dealdata["mv"]
- program = KLfit(cf.values/1e8, np.ones(n_scenarios)/n_scenarios, dealdata["mv"]/1e8)
+ cfdata[dealname]["wapbasis"] = (cf.mean() - dealdata["mv"]) / dealdata["mv"]
+ program = KLfit(
+ cf.values / 1e8, np.ones(n_scenarios) / n_scenarios, dealdata["mv"] / 1e8
+ )
cfdata[dealname]["weight"] = program["weight"]
print(dealname)
return cfdata
+
def getcusipcf(params, cfdata, tradedate):
- calibration_date = tradedate-BDay(1)
+ calibration_date = tradedate - BDay(1)
dist = get_dist(calibration_date)
cusipdata = {}
n_scenarios = 100
- intexfields = ["Cashflow", "Principal", "Interest", "Balance", "Accum Interest Shortfall"]
+ intexfields = [
+ "Cashflow",
+ "Principal",
+ "Interest",
+ "Balance",
+ "Accum Interest Shortfall",
+ ]
fields = ["Cashflow", "Principal", "Interest"]
cusips = list(params["cusips"].keys())
dealnames = dealname_from_cusip(conn, cusips)
@@ -160,32 +226,50 @@ def getcusipcf(params, cfdata, tradedate):
with ZipFile(zipfile) as myzip:
scen = np.zeros((n_scenarios, 5))
for j in range(n_scenarios):
- filename = "{0}-CF-Scen{1}.txt".format(cusip, j+1)
- data = pd.read_table(myzip.open(filename), skiprows=[1, 2], parse_dates=[0],
- thousands = ",",
- date_parser = lambda x: datetime.datetime.strptime(x, "%b %d, %Y"),
- index_col = 0,
- usecols=range(6))
+ filename = "{0}-CF-Scen{1}.txt".format(cusip, j + 1)
+ data = pd.read_table(
+ myzip.open(filename),
+ skiprows=[1, 2],
+ parse_dates=[0],
+ thousands=",",
+ date_parser=lambda x: datetime.datetime.strptime(x, "%b %d, %Y"),
+ index_col=0,
+ usecols=range(6),
+ )
for c in intexfields:
- if data.dtypes[c] != np.dtype('float64'):
+ if data.dtypes[c] != np.dtype("float64"):
data[c] = data[c].apply(sanitize_float)
- data["Balance"] = np.maximum(data.Balance - data["Accum Interest Shortfall"], 0)
+ data["Balance"] = np.maximum(
+ data.Balance - data["Accum Interest Shortfall"], 0
+ )
data = data.join(discounts_table)
- scen[j,:3] = data.df.dot(data[fields])
- scen[j,3] = data.Balance.diff()[1:].dot(data.yearfrac.shift()[1:])/dealdata["currbal"]
- scen[j,4] = np.dot(data.Cashflow, data.df * data.yearfrac)/scen[j,0] if scen[j,0] else 0
+ scen[j, :3] = data.df.dot(data[fields])
+ scen[j, 3] = (
+ data.Balance.diff()[1:].dot(data.yearfrac.shift()[1:])
+ / dealdata["currbal"]
+ )
+ scen[j, 4] = (
+ np.dot(data.Cashflow, data.df * data.yearfrac) / scen[j, 0]
+ if scen[j, 0]
+ else 0
+ )
+
-def compute_delta(dist, dealweight, cusip_pv, tradedate, K1 = 0, K2 = 1):
+def compute_delta(dist, dealweight, cusip_pv, tradedate, K1=0, K2=1):
Ngrid, nT = dist["L"].shape
scenariosl = np.zeros((dealweight.size, nT))
scenariosr = np.zeros((dealweight.size, nT))
for t in range(nT):
- scenariosl[:,t] = interpvalues(dist["L"][:,t], np.linspace(0, 1, Ngrid), dealweight)
- scenariosr[:,t] = interpvalues(dist["R"][:,t], np.linspace(0, 1, Ngrid), dealweight)
+ scenariosl[:, t] = interpvalues(
+ dist["L"][:, t], np.linspace(0, 1, Ngrid), dealweight
+ )
+ scenariosr[:, t] = interpvalues(
+ dist["R"][:, t], np.linspace(0, 1, Ngrid), dealweight
+ )
-if __name__=="__main__":
- if len(sys.argv)>1:
+if __name__ == "__main__":
+ if len(sys.argv) > 1:
tradedate = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d").date()
else:
tradedate = datetime.date.today()