diff options
Diffstat (limited to 'python/analytics/tranche_functions.py')
| -rw-r--r-- | python/analytics/tranche_functions.py | 354 |
1 files changed, 222 insertions, 132 deletions
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py index cfd047f8..00fd0ec1 100644 --- a/python/analytics/tranche_functions.py +++ b/python/analytics/tranche_functions.py @@ -2,8 +2,13 @@ import numpy as np from ctypes import POINTER, c_int, c_double, byref from numpy.ctypeslib import ndpointer from quantlib.time.schedule import Schedule, CDS2015 -from quantlib.time.api import (Actual360, Period, WeekendsOnly, - ModifiedFollowing, Unadjusted) +from quantlib.time.api import ( + Actual360, + Period, + WeekendsOnly, + ModifiedFollowing, + Unadjusted, +) from quantlib.util.converter import pydate_to_qldate import pandas as pd from scipy.special import h_roots @@ -17,158 +22,199 @@ def wrapped_ndpointer(*args, **kwargs): if obj is None: return obj return base.from_param(obj) - return type(base.__name__, (base,), {'from_param': classmethod(from_param)}) -libloss = np.ctypeslib.load_library("lossdistrib", os.path.join( - os.environ['CODE_DIR'], "python", "analytics")) + return type(base.__name__, (base,), {"from_param": classmethod(from_param)}) + + +libloss = np.ctypeslib.load_library( + "lossdistrib", os.path.join(os.environ["CODE_DIR"], "python", "analytics") +) libloss.fitprob.restype = None libloss.fitprob.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] + ndpointer("double", ndim=1, flags="F,writeable"), +] libloss.stochasticrecov.restype = None libloss.stochasticrecov.argtypes = [ POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=2, flags='F'), - ndpointer('double', ndim=2, flags='F'), + ndpointer("double", ndim=2, flags="F"), + ndpointer("double", ndim=2, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] + ndpointer("double", ndim=1, flags="F,writeable"), +] libloss.BCloss_recov_dist.restype = None libloss.BCloss_recov_dist.argtypes = [ - ndpointer('double', ndim=2, flags='F'),# defaultprob - POINTER(c_int),# nrow(defaultprob) - POINTER(c_int),# ncol(defaultprob) - ndpointer('double', ndim=1, flags='F'),# issuerweights - ndpointer('double', ndim=1, flags='F'),# recovery - ndpointer('double', ndim=1, flags='F'),# Z - ndpointer('double', ndim=1, flags='F'),# w - POINTER(c_int), # len(Z) = len(w) - ndpointer('double', ndim=1, flags='F'), # rho - POINTER(c_int), # Ngrid - POINTER(c_int), #defaultflag - ndpointer('double', ndim=2, flags='F,writeable'),# output L - ndpointer('double', ndim=2, flags='F,writeable')# output R + ndpointer("double", ndim=2, flags="F"), # defaultprob + POINTER(c_int), # nrow(defaultprob) + POINTER(c_int), # ncol(defaultprob) + ndpointer("double", ndim=1, flags="F"), # issuerweights + ndpointer("double", ndim=1, flags="F"), # recovery + ndpointer("double", ndim=1, flags="F"), # Z + ndpointer("double", ndim=1, flags="F"), # w + POINTER(c_int), # len(Z) = len(w) + ndpointer("double", ndim=1, flags="F"), # rho + POINTER(c_int), # Ngrid + POINTER(c_int), # defaultflag + ndpointer("double", ndim=2, flags="F,writeable"), # output L + ndpointer("double", ndim=2, flags="F,writeable"), # output R ] libloss.BCloss_recov_trunc.restype = None libloss.BCloss_recov_trunc.argtypes = [ - ndpointer('double', ndim=2, flags='F'),# defaultprob - POINTER(c_int),# nrow(defaultprob) - POINTER(c_int),# ncol(defaultprob) - ndpointer('double', ndim=1, flags='F'),# issuerweights - ndpointer('double', ndim=1, flags='F'),# recovery - ndpointer('double', ndim=1, flags='F'),# Z - ndpointer('double', ndim=1, flags='F'),# w - POINTER(c_int), # len(Z) = len(w) - ndpointer('double', ndim=1, flags='F'), # rho - POINTER(c_int), # Ngrid - POINTER(c_double), #K - POINTER(c_int), #defaultflag - ndpointer('double', ndim=1, flags='F,writeable'),# output EL - ndpointer('double', ndim=1, flags='F,writeable')# output ER + ndpointer("double", ndim=2, flags="F"), # defaultprob + POINTER(c_int), # nrow(defaultprob) + POINTER(c_int), # ncol(defaultprob) + ndpointer("double", ndim=1, flags="F"), # issuerweights + ndpointer("double", ndim=1, flags="F"), # recovery + ndpointer("double", ndim=1, flags="F"), # Z + ndpointer("double", ndim=1, flags="F"), # w + POINTER(c_int), # len(Z) = len(w) + ndpointer("double", ndim=1, flags="F"), # rho + POINTER(c_int), # Ngrid + POINTER(c_double), # K + POINTER(c_int), # defaultflag + ndpointer("double", ndim=1, flags="F,writeable"), # output EL + ndpointer("double", ndim=1, flags="F,writeable"), # output ER ] libloss.lossdistrib_joint.restype = None libloss.lossdistrib_joint.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.lossdistrib_joint_Z.restype = None libloss.lossdistrib_joint_Z.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.joint_default_averagerecov_distrib.restype = None libloss.joint_default_averagerecov_distrib.argtypes = [ - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), + ndpointer("double", ndim=1, flags="F"), POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') + ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.shockprob.restype = c_double -libloss.shockprob.argtypes = [ - c_double, - c_double, - c_double, - c_int] +libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int] libloss.shockseverity.restype = c_double -libloss.shockseverity.argtypes = [ - c_double, - c_double, - c_double, - c_double] +libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double] + def GHquad(n): Z, w = h_roots(n) - return Z*np.sqrt(2), w/np.sqrt(np.pi) + return Z * np.sqrt(2), w / np.sqrt(np.pi) + def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): q = np.zeros_like(Z) - libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), - byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q) + libloss.stochasticrecov( + byref(c_double(R)), + byref(c_double(Rtilde)), + Z, + w, + byref(c_int(Z.size)), + byref(c_double(rho)), + byref(c_double(porig)), + byref(c_double(pmod)), + q, + ) return q + def fitprob(Z, w, rho, p0): result = np.empty_like(Z) - libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result) + libloss.fitprob( + Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result + ) return result + def shockprob(p, rho, Z, give_log): return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log)) + def shockseverity(S, rho, Z, p): return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p)) -def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False): - L = np.zeros((Ngrid, defaultprob.shape[1]), order='F') + +def BCloss_recov_dist( + defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False +): + L = np.zeros((Ngrid, defaultprob.shape[1]), order="F") R = np.zeros_like(L) rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, - byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R) + libloss.BCloss_recov_dist( + defaultprob, + byref(c_int(defaultprob.shape[0])), + byref(c_int(defaultprob.shape[1])), + issuerweights, + recov, + Z, + w, + byref(c_int(Z.size)), + rho, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + L, + R, + ) return L, R -def BCloss_recov_trunc(defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False): + +def BCloss_recov_trunc( + defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False +): ELt = np.zeros(defaultprob.shape[1]) ERt = np.zeros_like(ELt) rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_trunc(defaultprob, byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, recov, Z, w, byref(c_int(Z.size)), - rho, byref(c_int(Ngrid)), byref(c_double(K)), - byref(c_int(defaultflag)), - ELt, ERt) + libloss.BCloss_recov_trunc( + defaultprob, + byref(c_int(defaultprob.shape[0])), + byref(c_int(defaultprob.shape[1])), + issuerweights, + recov, + Z, + w, + byref(c_int(Z.size)), + rho, + byref(c_int(Ngrid)), + byref(c_double(K)), + byref(c_int(defaultflag)), + ELt, + ERt, + ) return ELt, ERt + def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): """Joint loss-recovery distribution recursive algorithm. @@ -200,15 +246,23 @@ def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): np.sum(q, axis=1) is the loss (or default) distribution marginal """ - q = np.zeros((Ngrid, Ngrid), order='F') + q = np.zeros((Ngrid, Ngrid), order="F") if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) - libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), q) + assert p.shape == pp.shape + assert w.shape == S.shape + libloss.lossdistrib_joint( + p, + pp, + byref(c_int(p.shape[0])), + w, + S, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + q, + ) return q + def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """Joint loss-recovery distribution recursive algorithm. @@ -254,18 +308,29 @@ def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """ Z, wZ = GHquad(nZ) - q = np.zeros((Ngrid, Ngrid), order='F') + q = np.zeros((Ngrid, Ngrid), order="F") rho = rho * np.ones(p.shape[0]) if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) + assert p.shape == pp.shape + assert w.shape == S.shape - libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), rho, Z, wZ, - byref(c_int(nZ)), q) + libloss.lossdistrib_joint_Z( + p, + pp, + byref(c_int(p.shape[0])), + w, + S, + byref(c_int(Ngrid)), + byref(c_int(defaultflag)), + rho, + Z, + wZ, + byref(c_int(nZ)), + q, + ) return q + def joint_default_averagerecov_distrib(p, S, Ngrid=101): """Joint defaut-average recovery distribution recursive algorithm. @@ -291,41 +356,51 @@ def joint_default_averagerecov_distrib(p, S, Ngrid=101): np.sum(q, axis=1) is the loss (or default) distribution marginal """ - q = np.zeros((Ngrid, p.shape[0]+1), order='F') - assert(p.shape == S.shape) - libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])), - S, byref(c_int(Ngrid)), q) + q = np.zeros((Ngrid, p.shape[0] + 1), order="F") + assert p.shape == S.shape + libloss.joint_default_averagerecov_distrib( + p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q + ) return q.T + def adjust_attachments(K, losstodate, factor): """ computes the attachments adjusted for losses on current notional """ - return np.minimum(np.maximum((K-losstodate)/factor, 0), 1) + return np.minimum(np.maximum((K - losstodate) / factor, 0), 1) + def trancheloss(L, K1, K2): return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) + def trancherecov(R, K1, K2): return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) + def tranche_cl(L, R, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: support = np.linspace(0, 1, L.shape[0]) - size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \ - np.dot(trancherecov(support, K1, K2), R) - sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1]))) + size = ( + K2 + - K1 + - np.dot(trancheloss(support, K1, K2), L) + - np.dot(trancherecov(support, K1, K2), R) + ) + sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1]))) if scaled: - return 1 / (K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) + return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) + def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): - if(K1 == K2): - return 0. + if K1 == K2: + return 0.0 else: size = EL - ER dK = K2 - K1 @@ -335,8 +410,9 @@ def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): else: return np.dot(sizeadj * cs["coupons"], cs["df"]) + def tranche_pl(L, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: dK = K2 - K1 @@ -348,8 +424,9 @@ def tranche_pl(L, cs, K1, K2, scaled=False): else: return np.dot(np.diff(cf), cs["df"]) + def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): - if(K1 == K2): + if K1 == K2: return 0 else: dK = K2 - K1 @@ -359,9 +436,11 @@ def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): else: return np.dot(np.diff(cf), cs["df"]) + def tranche_pv(L, R, cs, K1, K2): return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) + def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): tradedate = pydate_to_qldate(tradedate) if enddate is None: @@ -371,13 +450,17 @@ def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): cal = WeekendsOnly() DC = Actual360() start_date = tradedate + 1 - sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal, - ModifiedFollowing, Unadjusted, CDS2015) + sched = Schedule.from_rule( + tradedate, enddate, Period("3M"), cal, ModifiedFollowing, Unadjusted, CDS2015 + ) dates = sched.to_npdates() - pydates = dates.astype('O') + pydates = dates.astype("O") df = [yc.discount_factor(d) for d in pydates if d > start_date] - coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1]) - if d2 > start_date] + coupons = [ + DC.year_fraction(d1, d2) * coupon + for d1, d2 in zip(sched[:-2], sched[1:-1]) + if d2 > start_date + ] coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) if dates[1] <= start_date: dates = dates[2:] @@ -391,16 +474,18 @@ def cds_accrued(tradedate, coupon): TODO: fix for when trade_date + 1 = IMM date""" tradedate = pydate_to_qldate(tradedate) - end = tradedate + Period('3M') + end = tradedate + Period("3M") start_protection = tradedate + 1 DC = Actual360() cal = WeekendsOnly() - sched = Schedule.from_rule(tradedate, end, Period('3M'), cal, - date_generation_rule=CDS2015) + sched = Schedule.from_rule( + tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015 + ) prevpaydate = sched.previous_date(start_protection) return DC.year_fraction(prevpaydate, start_protection) * coupon + def dist_transform(q): """computes the joint (D, R) distribution from the (L, R) distribution using D = L+R @@ -411,50 +496,54 @@ def dist_transform(q): for j in range(Ngrid): index = i + j if index < Ngrid: - distDR[index,j] += q[i,j] + distDR[index, j] += q[i, j] else: - distDR[Ngrid-1,j] += q[i,j] + distDR[Ngrid - 1, j] += q[i, j] return distDR + def dist_transform2(q): """computes the joint (D, R/D) distribution from the (D, R) distribution """ Ngrid = q.shape[0] - distDR = np.empty(Ngrid, dtype='object') + distDR = np.empty(Ngrid, dtype="object") for i in range(Ngrid): distDR[i] = {} for i in range(1, Ngrid): - for j in range(i+1): - index = (j / i) - distDR[i][index] = distDR[i].get(index, 0) + q[i,j] + for j in range(i + 1): + index = j / i + distDR[i][index] = distDR[i].get(index, 0) + q[i, j] return distDR + def compute_pv(q, strike): r""" compute E(1_{R^\bar \leq strike} * D)""" for i in range(q.shape): val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid return val + def average_recov(p, R, Ngrid): q = np.zeros((p.shape[0] + 1, Ngrid)) q[0, 0] = 1 - lu = 1 / (Ngrid-1) + lu = 1 / (Ngrid - 1) weights = np.empty(Ngrid) index = np.empty(Ngrid) grid = np.linspace(0, 1, Ngrid) for i, prob in enumerate(p): - for j in range(i+1, 0, -1): - newrecov = ((j-1) * grid + R[i])/j - np.modf(newrecov * ( Ngrid - 1), weights, index) - q[j] *= (1-prob) + for j in range(i + 1, 0, -1): + newrecov = ((j - 1) * grid + R[i]) / j + np.modf(newrecov * (Ngrid - 1), weights, index) + q[j] *= 1 - prob for k in range(Ngrid): - q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k] - q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k] - q[0] *= (1-prob) + q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k] + q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k] + q[0] *= 1 - prob return q -if __name__=="__main__": + +if __name__ == "__main__": # n_issuers = 100 # p = np.random.rand(n_issuers) # pp = np.random.rand(n_issuers) @@ -464,8 +553,9 @@ if __name__=="__main__": # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) import numpy as np + n_issuers = 100 p = np.random.rand(n_issuers) R = np.random.rand(n_issuers) - Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001) + Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001) Rbar_slow = average_recov(p, R, 1001) |
