aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_functions.py')
-rw-r--r--python/analytics/tranche_functions.py354
1 files changed, 222 insertions, 132 deletions
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py
index cfd047f8..00fd0ec1 100644
--- a/python/analytics/tranche_functions.py
+++ b/python/analytics/tranche_functions.py
@@ -2,8 +2,13 @@ import numpy as np
from ctypes import POINTER, c_int, c_double, byref
from numpy.ctypeslib import ndpointer
from quantlib.time.schedule import Schedule, CDS2015
-from quantlib.time.api import (Actual360, Period, WeekendsOnly,
- ModifiedFollowing, Unadjusted)
+from quantlib.time.api import (
+ Actual360,
+ Period,
+ WeekendsOnly,
+ ModifiedFollowing,
+ Unadjusted,
+)
from quantlib.util.converter import pydate_to_qldate
import pandas as pd
from scipy.special import h_roots
@@ -17,158 +22,199 @@ def wrapped_ndpointer(*args, **kwargs):
if obj is None:
return obj
return base.from_param(obj)
- return type(base.__name__, (base,), {'from_param': classmethod(from_param)})
-libloss = np.ctypeslib.load_library("lossdistrib", os.path.join(
- os.environ['CODE_DIR'], "python", "analytics"))
+ return type(base.__name__, (base,), {"from_param": classmethod(from_param)})
+
+
+libloss = np.ctypeslib.load_library(
+ "lossdistrib", os.path.join(os.environ["CODE_DIR"], "python", "analytics")
+)
libloss.fitprob.restype = None
libloss.fitprob.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer("double", ndim=1, flags="F,writeable"),
+]
libloss.stochasticrecov.restype = None
libloss.stochasticrecov.argtypes = [
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=2, flags='F'),
- ndpointer('double', ndim=2, flags='F'),
+ ndpointer("double", ndim=2, flags="F"),
+ ndpointer("double", ndim=2, flags="F"),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer("double", ndim=1, flags="F,writeable"),
+]
libloss.BCloss_recov_dist.restype = None
libloss.BCloss_recov_dist.argtypes = [
- ndpointer('double', ndim=2, flags='F'),# defaultprob
- POINTER(c_int),# nrow(defaultprob)
- POINTER(c_int),# ncol(defaultprob)
- ndpointer('double', ndim=1, flags='F'),# issuerweights
- ndpointer('double', ndim=1, flags='F'),# recovery
- ndpointer('double', ndim=1, flags='F'),# Z
- ndpointer('double', ndim=1, flags='F'),# w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer('double', ndim=1, flags='F'), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_int), #defaultflag
- ndpointer('double', ndim=2, flags='F,writeable'),# output L
- ndpointer('double', ndim=2, flags='F,writeable')# output R
+ ndpointer("double", ndim=2, flags="F"), # defaultprob
+ POINTER(c_int), # nrow(defaultprob)
+ POINTER(c_int), # ncol(defaultprob)
+ ndpointer("double", ndim=1, flags="F"), # issuerweights
+ ndpointer("double", ndim=1, flags="F"), # recovery
+ ndpointer("double", ndim=1, flags="F"), # Z
+ ndpointer("double", ndim=1, flags="F"), # w
+ POINTER(c_int), # len(Z) = len(w)
+ ndpointer("double", ndim=1, flags="F"), # rho
+ POINTER(c_int), # Ngrid
+ POINTER(c_int), # defaultflag
+ ndpointer("double", ndim=2, flags="F,writeable"), # output L
+ ndpointer("double", ndim=2, flags="F,writeable"), # output R
]
libloss.BCloss_recov_trunc.restype = None
libloss.BCloss_recov_trunc.argtypes = [
- ndpointer('double', ndim=2, flags='F'),# defaultprob
- POINTER(c_int),# nrow(defaultprob)
- POINTER(c_int),# ncol(defaultprob)
- ndpointer('double', ndim=1, flags='F'),# issuerweights
- ndpointer('double', ndim=1, flags='F'),# recovery
- ndpointer('double', ndim=1, flags='F'),# Z
- ndpointer('double', ndim=1, flags='F'),# w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer('double', ndim=1, flags='F'), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_double), #K
- POINTER(c_int), #defaultflag
- ndpointer('double', ndim=1, flags='F,writeable'),# output EL
- ndpointer('double', ndim=1, flags='F,writeable')# output ER
+ ndpointer("double", ndim=2, flags="F"), # defaultprob
+ POINTER(c_int), # nrow(defaultprob)
+ POINTER(c_int), # ncol(defaultprob)
+ ndpointer("double", ndim=1, flags="F"), # issuerweights
+ ndpointer("double", ndim=1, flags="F"), # recovery
+ ndpointer("double", ndim=1, flags="F"), # Z
+ ndpointer("double", ndim=1, flags="F"), # w
+ POINTER(c_int), # len(Z) = len(w)
+ ndpointer("double", ndim=1, flags="F"), # rho
+ POINTER(c_int), # Ngrid
+ POINTER(c_double), # K
+ POINTER(c_int), # defaultflag
+ ndpointer("double", ndim=1, flags="F,writeable"), # output EL
+ ndpointer("double", ndim=1, flags="F,writeable"), # output ER
]
libloss.lossdistrib_joint.restype = None
libloss.lossdistrib_joint.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ wrapped_ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.lossdistrib_joint_Z.restype = None
libloss.lossdistrib_joint_Z.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ wrapped_ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.joint_default_averagerecov_distrib.restype = None
libloss.joint_default_averagerecov_distrib.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
+ ndpointer("double", ndim=1, flags="F"),
POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
+ ndpointer("double", ndim=2, flags="F,writeable"),
]
libloss.shockprob.restype = c_double
-libloss.shockprob.argtypes = [
- c_double,
- c_double,
- c_double,
- c_int]
+libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int]
libloss.shockseverity.restype = c_double
-libloss.shockseverity.argtypes = [
- c_double,
- c_double,
- c_double,
- c_double]
+libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double]
+
def GHquad(n):
Z, w = h_roots(n)
- return Z*np.sqrt(2), w/np.sqrt(np.pi)
+ return Z * np.sqrt(2), w / np.sqrt(np.pi)
+
def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod):
q = np.zeros_like(Z)
- libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)),
- byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q)
+ libloss.stochasticrecov(
+ byref(c_double(R)),
+ byref(c_double(Rtilde)),
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ byref(c_double(rho)),
+ byref(c_double(porig)),
+ byref(c_double(pmod)),
+ q,
+ )
return q
+
def fitprob(Z, w, rho, p0):
result = np.empty_like(Z)
- libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result)
+ libloss.fitprob(
+ Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result
+ )
return result
+
def shockprob(p, rho, Z, give_log):
return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log))
+
def shockseverity(S, rho, Z, p):
return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p))
-def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False):
- L = np.zeros((Ngrid, defaultprob.shape[1]), order='F')
+
+def BCloss_recov_dist(
+ defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False
+):
+ L = np.zeros((Ngrid, defaultprob.shape[1]), order="F")
R = np.zeros_like(L)
rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights, recov, Z, w, byref(c_int(Z.size)), rho,
- byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R)
+ libloss.BCloss_recov_dist(
+ defaultprob,
+ byref(c_int(defaultprob.shape[0])),
+ byref(c_int(defaultprob.shape[1])),
+ issuerweights,
+ recov,
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ rho,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ L,
+ R,
+ )
return L, R
-def BCloss_recov_trunc(defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False):
+
+def BCloss_recov_trunc(
+ defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False
+):
ELt = np.zeros(defaultprob.shape[1])
ERt = np.zeros_like(ELt)
rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_trunc(defaultprob, byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights, recov, Z, w, byref(c_int(Z.size)),
- rho, byref(c_int(Ngrid)), byref(c_double(K)),
- byref(c_int(defaultflag)),
- ELt, ERt)
+ libloss.BCloss_recov_trunc(
+ defaultprob,
+ byref(c_int(defaultprob.shape[0])),
+ byref(c_int(defaultprob.shape[1])),
+ issuerweights,
+ recov,
+ Z,
+ w,
+ byref(c_int(Z.size)),
+ rho,
+ byref(c_int(Ngrid)),
+ byref(c_double(K)),
+ byref(c_int(defaultflag)),
+ ELt,
+ ERt,
+ )
return ELt, ERt
+
def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
"""Joint loss-recovery distribution recursive algorithm.
@@ -200,15 +246,23 @@ def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
np.sum(q, axis=1) is the loss (or default) distribution marginal
"""
- q = np.zeros((Ngrid, Ngrid), order='F')
+ q = np.zeros((Ngrid, Ngrid), order="F")
if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
- libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), q)
+ assert p.shape == pp.shape
+ assert w.shape == S.shape
+ libloss.lossdistrib_joint(
+ p,
+ pp,
+ byref(c_int(p.shape[0])),
+ w,
+ S,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ q,
+ )
return q
+
def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
"""Joint loss-recovery distribution recursive algorithm.
@@ -254,18 +308,29 @@ def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
"""
Z, wZ = GHquad(nZ)
- q = np.zeros((Ngrid, Ngrid), order='F')
+ q = np.zeros((Ngrid, Ngrid), order="F")
rho = rho * np.ones(p.shape[0])
if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
+ assert p.shape == pp.shape
+ assert w.shape == S.shape
- libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), rho, Z, wZ,
- byref(c_int(nZ)), q)
+ libloss.lossdistrib_joint_Z(
+ p,
+ pp,
+ byref(c_int(p.shape[0])),
+ w,
+ S,
+ byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)),
+ rho,
+ Z,
+ wZ,
+ byref(c_int(nZ)),
+ q,
+ )
return q
+
def joint_default_averagerecov_distrib(p, S, Ngrid=101):
"""Joint defaut-average recovery distribution recursive algorithm.
@@ -291,41 +356,51 @@ def joint_default_averagerecov_distrib(p, S, Ngrid=101):
np.sum(q, axis=1) is the loss (or default) distribution marginal
"""
- q = np.zeros((Ngrid, p.shape[0]+1), order='F')
- assert(p.shape == S.shape)
- libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])),
- S, byref(c_int(Ngrid)), q)
+ q = np.zeros((Ngrid, p.shape[0] + 1), order="F")
+ assert p.shape == S.shape
+ libloss.joint_default_averagerecov_distrib(
+ p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q
+ )
return q.T
+
def adjust_attachments(K, losstodate, factor):
"""
computes the attachments adjusted for losses
on current notional
"""
- return np.minimum(np.maximum((K-losstodate)/factor, 0), 1)
+ return np.minimum(np.maximum((K - losstodate) / factor, 0), 1)
+
def trancheloss(L, K1, K2):
return np.maximum(L - K1, 0) - np.maximum(L - K2, 0)
+
def trancherecov(R, K1, K2):
return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0)
+
def tranche_cl(L, R, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
support = np.linspace(0, 1, L.shape[0])
- size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \
- np.dot(trancherecov(support, K1, K2), R)
- sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1])))
+ size = (
+ K2
+ - K1
+ - np.dot(trancheloss(support, K1, K2), L)
+ - np.dot(trancherecov(support, K1, K2), R)
+ )
+ sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1])))
if scaled:
- return 1 / (K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
+ return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
else:
return np.dot(sizeadj * cs["coupons"], cs["df"])
+
def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
- if(K1 == K2):
- return 0.
+ if K1 == K2:
+ return 0.0
else:
size = EL - ER
dK = K2 - K1
@@ -335,8 +410,9 @@ def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
else:
return np.dot(sizeadj * cs["coupons"], cs["df"])
+
def tranche_pl(L, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
dK = K2 - K1
@@ -348,8 +424,9 @@ def tranche_pl(L, cs, K1, K2, scaled=False):
else:
return np.dot(np.diff(cf), cs["df"])
+
def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
- if(K1 == K2):
+ if K1 == K2:
return 0
else:
dK = K2 - K1
@@ -359,9 +436,11 @@ def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
else:
return np.dot(np.diff(cf), cs["df"])
+
def tranche_pv(L, R, cs, K1, K2):
return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2)
+
def credit_schedule(tradedate, tenor, coupon, yc, enddate=None):
tradedate = pydate_to_qldate(tradedate)
if enddate is None:
@@ -371,13 +450,17 @@ def credit_schedule(tradedate, tenor, coupon, yc, enddate=None):
cal = WeekendsOnly()
DC = Actual360()
start_date = tradedate + 1
- sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal,
- ModifiedFollowing, Unadjusted, CDS2015)
+ sched = Schedule.from_rule(
+ tradedate, enddate, Period("3M"), cal, ModifiedFollowing, Unadjusted, CDS2015
+ )
dates = sched.to_npdates()
- pydates = dates.astype('O')
+ pydates = dates.astype("O")
df = [yc.discount_factor(d) for d in pydates if d > start_date]
- coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])
- if d2 > start_date]
+ coupons = [
+ DC.year_fraction(d1, d2) * coupon
+ for d1, d2 in zip(sched[:-2], sched[1:-1])
+ if d2 > start_date
+ ]
coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon)
if dates[1] <= start_date:
dates = dates[2:]
@@ -391,16 +474,18 @@ def cds_accrued(tradedate, coupon):
TODO: fix for when trade_date + 1 = IMM date"""
tradedate = pydate_to_qldate(tradedate)
- end = tradedate + Period('3M')
+ end = tradedate + Period("3M")
start_protection = tradedate + 1
DC = Actual360()
cal = WeekendsOnly()
- sched = Schedule.from_rule(tradedate, end, Period('3M'), cal,
- date_generation_rule=CDS2015)
+ sched = Schedule.from_rule(
+ tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015
+ )
prevpaydate = sched.previous_date(start_protection)
return DC.year_fraction(prevpaydate, start_protection) * coupon
+
def dist_transform(q):
"""computes the joint (D, R) distribution
from the (L, R) distribution using D = L+R
@@ -411,50 +496,54 @@ def dist_transform(q):
for j in range(Ngrid):
index = i + j
if index < Ngrid:
- distDR[index,j] += q[i,j]
+ distDR[index, j] += q[i, j]
else:
- distDR[Ngrid-1,j] += q[i,j]
+ distDR[Ngrid - 1, j] += q[i, j]
return distDR
+
def dist_transform2(q):
"""computes the joint (D, R/D) distribution
from the (D, R) distribution
"""
Ngrid = q.shape[0]
- distDR = np.empty(Ngrid, dtype='object')
+ distDR = np.empty(Ngrid, dtype="object")
for i in range(Ngrid):
distDR[i] = {}
for i in range(1, Ngrid):
- for j in range(i+1):
- index = (j / i)
- distDR[i][index] = distDR[i].get(index, 0) + q[i,j]
+ for j in range(i + 1):
+ index = j / i
+ distDR[i][index] = distDR[i].get(index, 0) + q[i, j]
return distDR
+
def compute_pv(q, strike):
r""" compute E(1_{R^\bar \leq strike} * D)"""
for i in range(q.shape):
val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid
return val
+
def average_recov(p, R, Ngrid):
q = np.zeros((p.shape[0] + 1, Ngrid))
q[0, 0] = 1
- lu = 1 / (Ngrid-1)
+ lu = 1 / (Ngrid - 1)
weights = np.empty(Ngrid)
index = np.empty(Ngrid)
grid = np.linspace(0, 1, Ngrid)
for i, prob in enumerate(p):
- for j in range(i+1, 0, -1):
- newrecov = ((j-1) * grid + R[i])/j
- np.modf(newrecov * ( Ngrid - 1), weights, index)
- q[j] *= (1-prob)
+ for j in range(i + 1, 0, -1):
+ newrecov = ((j - 1) * grid + R[i]) / j
+ np.modf(newrecov * (Ngrid - 1), weights, index)
+ q[j] *= 1 - prob
for k in range(Ngrid):
- q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k]
- q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k]
- q[0] *= (1-prob)
+ q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k]
+ q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k]
+ q[0] *= 1 - prob
return q
-if __name__=="__main__":
+
+if __name__ == "__main__":
# n_issuers = 100
# p = np.random.rand(n_issuers)
# pp = np.random.rand(n_issuers)
@@ -464,8 +553,9 @@ if __name__=="__main__":
# pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
# poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)
import numpy as np
+
n_issuers = 100
p = np.random.rand(n_issuers)
R = np.random.rand(n_issuers)
- Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001)
+ Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001)
Rbar_slow = average_recov(p, R, 1001)