diff options
Diffstat (limited to 'python/tranche_functions.py')
| -rw-r--r-- | python/tranche_functions.py | 384 |
1 files changed, 0 insertions, 384 deletions
diff --git a/python/tranche_functions.py b/python/tranche_functions.py deleted file mode 100644 index a92044e3..00000000 --- a/python/tranche_functions.py +++ /dev/null @@ -1,384 +0,0 @@ -import numpy as np -from ctypes import * -from numpy.ctypeslib import ndpointer -from quantlib.time.schedule import Schedule, CDS2015 -from quantlib.time.api import Actual360, Period, WeekendsOnly, ModifiedFollowing, Unadjusted, today -from quantlib.util.converter import qldate_to_pydate, pydate_to_qldate -import pandas as pd -from scipy.special import h_roots -import os - -def wrapped_ndpointer(*args, **kwargs): - base = ndpointer(*args, **kwargs) - def from_param(cls, obj): - if obj is None: - return obj - return base.from_param(obj) - return type(base.__name__, (base,), {'from_param': classmethod(from_param)}) - -libloss = np.ctypeslib.load_library("lossdistrib", "../R/lossdistrib/src/") -libloss.fitprob.restype = None -libloss.fitprob.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] -libloss.stochasticrecov.restype = None -libloss.stochasticrecov.argtypes = [ - POINTER(c_double), - POINTER(c_double), - ndpointer('double', ndim=2, flags='F'), - ndpointer('double', ndim=2, flags='F'), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - POINTER(c_double), - ndpointer('double', ndim=1, flags='F,writeable')] -libloss.BCloss_recov_dist.restype = None -libloss.BCloss_recov_dist.argtypes = [ - ndpointer('double', ndim=2, flags='F'),# defaultprob - POINTER(c_int),# nrow(defaultprob) - POINTER(c_int),# ncol(defaultprob) - ndpointer('double', ndim=1, flags='F'),# issuerweights - ndpointer('double', ndim=1, flags='F'),# recovery - ndpointer('double', ndim=1, flags='F'),# Z - ndpointer('double', ndim=1, flags='F'),# w - POINTER(c_int), # len(Z) = len(w) - ndpointer('double', ndim=1, flags='F'), # rho - POINTER(c_int), # Ngrid - POINTER(c_int), #defaultflag - ndpointer('double', ndim=2, flags='F,writeable'),# output L - ndpointer('double', ndim=2, flags='F,writeable')# output R -] - -libloss.lossdistrib_joint.restype = None -libloss.lossdistrib_joint.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') -] - -libloss.lossdistrib_joint_Z.restype = None -libloss.lossdistrib_joint_Z.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - wrapped_ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') -] - -libloss.joint_default_averagerecov_distrib.restype = None -libloss.joint_default_averagerecov_distrib.argtypes = [ - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - ndpointer('double', ndim=1, flags='F'), - POINTER(c_int), - ndpointer('double', ndim=2, flags='F,writeable') -] - -def GHquad(n): - Z, w = h_roots(n) - return Z*np.sqrt(2), w/np.sqrt(np.pi) - -def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): - q = np.zeros_like(Z) - libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), - byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q) - return q - -def fitprob(Z, w, rho, p0): - result = np.empty_like(Z) - libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result) - return result - -def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False): - L = np.zeros((Ngrid, defaultprob.shape[1]), order='F') - R = np.zeros_like(L) - rho = np.repeat(rho, issuerweights.size) - libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, - byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R) - return L, R - -def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, Ngrid), order='F') - if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) - libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), q) - return q - -def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - rho : float - Correlation. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - nZ : int, optional - Size of stochastic factor. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - - Examples - -------- - >>> import numpy as np - >>> p = np.random.rand(100) - >>> pp = np.zeros(100) - >>> w = 1/100 * np.ones(100) - >>> S = np.random.rand(100) - >>> q = lossdistrib_joint_Z(p, pp, S, 0.5) - - """ - Z, wZ = GHquad(nZ) - q = np.zeros((Ngrid, Ngrid), order='F') - rho = rho * np.ones(p.shape[0]) - if pp is not None: - assert(p.shape == pp.shape) - assert(w.shape == S.shape) - - libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])), - w, S, byref(c_int(Ngrid)), - byref(c_int(defaultflag)), rho, Z, wZ, - byref(c_int(nZ)), q) - return q - -def joint_default_averagerecov_distrib(p, S, Ngrid=101): - """Joint defaut-average recovery distribution recursive algorithm. - - This computes the joint default/average recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, p.shape[0]+1), order='F') - assert(p.shape == S.shape) - libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])), - S, byref(c_int(Ngrid)), q) - return q.T - -def adjust_attachments(K, losstodate, factor): - """ - computes the attachments adjusted for losses - on current notional - """ - return np.minimum(np.maximum((K-losstodate)/factor, 0), 1) - -def trancheloss(L, K1, K2): - return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) - -def trancherecov(R, K1, K2): - return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) - -def tranche_cl(L, R, cs, K1, K2, scaled = False): - if(K1 == K2): - return 0 - else: - support = np.linspace(0, 1, L.shape[0]) - size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \ - np.dot(trancherecov(support, K1, K2), R) - sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1]))) - if scaled: - return 1/(K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) - else: - return np.dot(sizeadj * cs["coupons"], cs["df"]) - -def tranche_pl(L, cs, K1, K2, scaled=False): - if(K1 == K2): - return 0 - else: - support = np.linspace(0, 1, L.shape[0]) - cf = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - cf = np.hstack((K2-K1, cf)) - if scaled: - return 1/(K2-K1) * np.dot(np.diff(cf), cs["df"]) - else: - return np.dot(np.diff(cf), cs["df"]) - -def tranche_pv(L, R, cs, K1, K2): - return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) - -def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): - tradedate = pydate_to_qldate(tradedate) - if enddate is None: - enddate = tradedate + Period(tenor) - else: - enddate = pydate_to_qldate(enddate) - cal = WeekendsOnly() - DC = Actual360() - sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal, - ModifiedFollowing, Unadjusted, CDS2015) - dates = sched.to_npdates() - pydates = dates.astype('O') - df = [yc.discount_factor(d) for d in pydates if d > tradedate] - coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])] - coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) - - return pydates[0], pd.DataFrame({"df": df, "coupons": coupons}, dates[1:]) - -def cdsAccrued(tradedate, coupon): - tradedate = pydate_to_qldate(tradedate) - end = tradedate + Period('3Mo') - - start_protection = tradedate + 1 - DC = Actual360() - cal = UnitedStates() - sched = Schedule(start, end, Period('3Mo'), cal, date_generation_rule=CDS2015) - prevpaydate = sched.previous_date(tradedate) - return DC.year_fraction(prevpaydate, start_protection) * coupon - -def dist_transform(q): - """computes the joint (D, R) distribution - from the (L, R) distribution using D = L+R - """ - Ngrid = q.shape[0] - distDR = np.zeros_like(q) - for i in range(Ngrid): - for j in range(Ngrid): - index = i + j - if index < Ngrid: - distDR[index,j] += q[i,j] - else: - distDR[Ngrid-1,j] += q[i,j] - return distDR - -def dist_transform2(q): - """computes the joint (D, R/D) distribution - from the (D, R) distribution - """ - Ngrid = q.shape[0] - distDR = np.empty(Ngrid, dtype='object') - for i in range(Ngrid): - distDR[i] = {} - for i in range(1, Ngrid): - for j in range(i+1): - index = (j / i) - distDR[i][index] = distDR[i].get(index, 0) + q[i,j] - return distDR - -def compute_pv(q, strike): - """ compute E(1_{R^\bar \leq strike} * D)""" - D = 0 - for i in range(q.shape): - val += sum(v for k, v in q[i].items() if k < strike) * 1/Ngrid - return val - -def average_recov(p, R, Ngrid): - q = np.zeros((p.shape[0]+1, Ngrid)) - q[0,0] = 1 - lu = 1 / (Ngrid-1) - weights = np.empty(Ngrid) - index = np.empty(Ngrid) - grid = np.linspace(0, 1, Ngrid) - for i, prob in enumerate(p): - for j in range(i+1, 0, -1): - newrecov = ((j-1) * grid + R[i])/j - np.modf(newrecov * ( Ngrid - 1), weights, index) - q[j] *= (1-prob) - for k in range(Ngrid): - q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k] - q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k] - q[0] *= (1-prob) - return q - -if __name__=="__main__": - # n_issuers = 100 - # p = np.random.rand(n_issuers) - # pp = np.random.rand(n_issuers) - # w = 1/n_issuers * np.ones(n_issuers) - # S = np.random.rand(n_issuers) - # rho = 0.5 - # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) - # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) - import numpy as np - n_issuers = 100 - p = np.random.rand(n_issuers) - R = np.random.rand(n_issuers) - Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001) - Rbar_slow = average_recov(p, R, 1001) |
