import numpy as np from ctypes import * from numpy.ctypeslib import ndpointer from quantlib.time.schedule import Schedule, CDS from quantlib.time.api import Actual360, Period, UnitedStates, Following, today from quantlib.util.converter import qldate_to_pydate, pydate_to_qldate import pandas as pd from scipy.special import h_roots import os def wrapped_ndpointer(*args, **kwargs): base = ndpointer(*args, **kwargs) def from_param(cls, obj): if obj is None: return obj return base.from_param(obj) return type(base.__name__, (base,), {'from_param': classmethod(from_param)}) libloss = np.ctypeslib.load_library("lossdistrib", "../R/lossdistrib/src/") libloss.fitprob.restype = None libloss.fitprob.argtypes = [ ndpointer('double', ndim=1, flags='F'), ndpointer('double', ndim=1, flags='F'), POINTER(c_int), POINTER(c_double), POINTER(c_double), ndpointer('double', ndim=1, flags='F,writeable')] libloss.stochasticrecov.restype = None libloss.stochasticrecov.argtypes = [ POINTER(c_double), POINTER(c_double), ndpointer('double', ndim=2, flags='F'), ndpointer('double', ndim=2, flags='F'), POINTER(c_int), POINTER(c_double), POINTER(c_double), POINTER(c_double), ndpointer('double', ndim=1, flags='F,writeable')] libloss.BCloss_recov_dist.restype = None libloss.BCloss_recov_dist.argtypes = [ ndpointer('double', ndim=2, flags='F'),# defaultprob POINTER(c_int),# nrow(defaultprob) POINTER(c_int),# ncol(defaultprob) ndpointer('double', ndim=1, flags='F'),# issuerweights ndpointer('double', ndim=1, flags='F'),# recovery ndpointer('double', ndim=1, flags='F'),# Z ndpointer('double', ndim=1, flags='F'),# w POINTER(c_int), # len(Z) = len(w) ndpointer('double', ndim=1, flags='F'), # rho POINTER(c_int), # Ngrid POINTER(c_int), #defaultflag ndpointer('double', ndim=2, flags='F,writeable'),# output L ndpointer('double', ndim=2, flags='F,writeable')# output R ] libloss.lossdistrib_joint.restype = None libloss.lossdistrib_joint.argtypes = [ ndpointer('double', ndim=1, flags='F'), wrapped_ndpointer('double', ndim=1, flags='F'), POINTER(c_int), ndpointer('double', ndim=1, flags='F'), ndpointer('double', ndim=1, flags='F'), POINTER(c_int), POINTER(c_int), ndpointer('double', ndim=2, flags='F,writeable') ] libloss.lossdistrib_joint_Z.restype = None libloss.lossdistrib_joint_Z.argtypes = [ ndpointer('double', ndim=1, flags='F'), wrapped_ndpointer('double', ndim=1, flags='F'), POINTER(c_int), ndpointer('double', ndim=1, flags='F'), ndpointer('double', ndim=1, flags='F'), POINTER(c_int), POINTER(c_int), ndpointer('double', ndim=1, flags='F'), ndpointer('double', ndim=1, flags='F'), ndpointer('double', ndim=1, flags='F'), POINTER(c_int), ndpointer('double', ndim=2, flags='F,writeable') ] libloss.joint_default_averagerecov_distrib.restype = None libloss.joint_default_averagerecov_distrib.argtypes = [ ndpointer('double', ndim=1, flags='F'), POINTER(c_int), ndpointer('double', ndim=1, flags='F'), POINTER(c_int), ndpointer('double', ndim=2, flags='F,writeable') ] def GHquad(n): Z, w = h_roots(n) return Z*np.sqrt(2), w/np.sqrt(np.pi) def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): q = np.zeros_like(Z) libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q) return q def fitprob(Z, w, rho, p0): result = np.empty_like(Z) libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result) return result def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False): L = np.zeros((Ngrid, defaultprob.shape[1]), order='F') R = np.zeros_like(L) rho = np.repeat(rho, issuerweights.size) libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])), byref(c_int(defaultprob.shape[1])), issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R) return L, R def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): """Joint loss-recovery distribution recursive algorithm. This computes the joint loss/recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. pp : (N,) array_like or None Vector of prepayments. w : (N,) array_like Issuer weights. S : (N,) array_like Vector of severities. Ngrid : integer, optional Number of ticks on the grid, default 101. defaultflag : bool, optional If True computes the default distribution instead. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal """ q = np.zeros((Ngrid, Ngrid), order='F') if pp is not None: assert(p.shape == pp.shape) assert(w.shape == S.shape) libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])), w, S, byref(c_int(Ngrid)), byref(c_int(defaultflag)), q) return q def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """Joint loss-recovery distribution recursive algorithm. This computes the joint loss/recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. pp : (N,) array_like or None Vector of prepayments. w : (N,) array_like Issuer weights. S : (N,) array_like Vector of severities. rho : float Correlation. Ngrid : integer, optional Number of ticks on the grid, default 101. defaultflag : bool, optional If True computes the default distribution instead. nZ : int, optional Size of stochastic factor. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal Examples -------- >>> import numpy as np >>> p = np.random.rand(100) >>> pp = np.zeros(100) >>> w = 1/100 * np.ones(100) >>> S = np.random.rand(100) >>> q = lossdistrib_joint_Z(p, pp, S, 0.5) """ Z, wZ = GHquad(nZ) q = np.zeros((Ngrid, Ngrid), order='F') rho = rho * np.ones(p.shape[0]) if pp is not None: assert(p.shape == pp.shape) assert(w.shape == S.shape) libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])), w, S, byref(c_int(Ngrid)), byref(c_int(defaultflag)), rho, Z, wZ, byref(c_int(nZ)), q) return q def joint_default_averagerecov_distrib(p, S, Ngrid=101): """Joint defaut-average recovery distribution recursive algorithm. This computes the joint default/average recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. S : (N,) array_like Vector of severities. Ngrid : integer, optional Number of ticks on the grid, default 101. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal """ q = np.zeros((Ngrid, p.shape[0]+1), order='F') assert(p.shape == S.shape) libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q) return q.T def adjust_attachments(K, losstodate, factor): """ computes the attachments adjusted for losses on current notional """ return np.minimum(np.maximum((K-losstodate)/factor, 0), 1) def trancheloss(L, K1, K2): return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) def trancherecov(R, K1, K2): return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) def tranche_cl(L, R, cs, K1, K2, scaled = False): if(K1 == K2): return 0 else: support = np.linspace(0, 1, L.shape[0]) size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \ np.dot(trancherecov(support, K1, K2), R) sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1]))) if scaled: return 1/(K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) def tranche_pl(L, cs, K1, K2, scaled=False): if(K1 == K2): return 0 else: support = np.linspace(0, 1, L.shape[0]) cf = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) cf = np.hstack((K2-K1, cf)) if scaled: return 1/(K2-K1) * np.dot(np.diff(cf), cs["df"]) else: return np.dot(np.diff(cf), cs["df"]) def tranche_pv(L, R, cs, K1, K2): return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) def creditSchedule(tradedate, tenor, coupon, yc, enddate = None): tradedate = pydate_to_qldate(tradedate) start = tradedate - Period('4Mo') enddate = pydate_to_qldate(enddate) if enddate is None: enddate = tradedate + Period(tenor) cal = UnitedStates() DC = Actual360() sched = Schedule(start, enddate, Period('3Mo'), cal, date_generation_rule=CDS) prevpaydate = sched.previous_date(tradedate) sched = [d for d in sched if d>=prevpaydate] df = [yc.discount(d) for d in sched if d > tradedate] dates = pd.to_datetime([str(d) for d in sched if d > tradedate], "%m/%d/%Y") coupons = np.diff([DC.year_fraction(prevpaydate, d) * coupon for d in sched]) return pd.DataFrame({"df":df, "coupons":coupons}, dates) def cdsAccrued(tradedate, coupon): tradedate = pydate_to_qldate(tradedate) start = tradedate - Period('3Mo') end = tradedate + Period('3Mo') start_protection = tradedate + 1 DC = Actual360() cal = UnitedStates() sched = Schedule(start, end, Period('3Mo'), cal, date_generation_rule=CDS) prevpaydate = sched.previous_date(tradedate) return DC.year_fraction(prevpaydate, start_protection) * coupon def dist_transform(q): """computes the joint (D, R) distribution from the (L, R) distribution using D = L+R """ Ngrid = q.shape[0] distDR = np.zeros_like(q) for i in range(Ngrid): for j in range(Ngrid): index = i + j if index < Ngrid: distDR[index,j] += q[i,j] else: distDR[Ngrid-1,j] += q[i,j] return distDR def dist_transform2(q): """computes the joint (D, R/D) distribution from the (D, R) distribution """ Ngrid = q.shape[0] distDR = np.empty(Ngrid, dtype='object') for i in range(Ngrid): distDR[i] = {} for i in range(1, Ngrid): for j in range(i+1): index = (j / i) distDR[i][index] = distDR[i].get(index, 0) + q[i,j] return distDR def compute_pv(q, strike): """ compute E(1_{R^\bar \leq strike} * D)""" D = 0 for i in range(q.shape): val += sum(v for k, v in q[i].items() if k < strike) * 1/Ngrid return val def average_recov(p, R, Ngrid): q = np.zeros((p.shape[0]+1, Ngrid)) q[0,0] = 1 lu = 1 / (Ngrid-1) weights = np.empty(Ngrid) index = np.empty(Ngrid) grid = np.linspace(0, 1, Ngrid) for i, prob in enumerate(p): for j in range(i+1, 0, -1): newrecov = ((j-1) * grid + R[i])/j np.modf(newrecov * ( Ngrid - 1), weights, index) q[j] *= (1-prob) for k in range(Ngrid): q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k] q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k] q[0] *= (1-prob) return q if __name__=="__main__": # n_issuers = 100 # p = np.random.rand(n_issuers) # pp = np.random.rand(n_issuers) # w = 1/n_issuers * np.ones(n_issuers) # S = np.random.rand(n_issuers) # rho = 0.5 # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) import numpy as np n_issuers = 100 p = np.random.rand(n_issuers) R = np.random.rand(n_issuers) Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001) Rbar_slow = average_recov(p, R, 1001)