import numpy as np from ctypes import POINTER, c_int, c_double, byref from numpy.ctypeslib import ndpointer from quantlib.time.schedule import Schedule, CDS2015 from quantlib.time.api import ( Actual360, Period, WeekendsOnly, ModifiedFollowing, Unadjusted, ) from quantlib.util.converter import pydate_to_qldate import pandas as pd from scipy.special import h_roots import os def wrapped_ndpointer(*args, **kwargs): base = ndpointer(*args, **kwargs) def from_param(cls, obj): if obj is None: return obj return base.from_param(obj) return type(base.__name__, (base,), {"from_param": classmethod(from_param)}) libloss = np.ctypeslib.load_library( "lossdistrib", os.path.join(os.environ["CODE_DIR"], "python", "analytics") ) libloss.fitprob.restype = None libloss.fitprob.argtypes = [ ndpointer("double", ndim=1, flags="F"), ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), ndpointer("double", ndim=1, flags="F,writeable"), ] libloss.stochasticrecov.restype = None libloss.stochasticrecov.argtypes = [ POINTER(c_double), POINTER(c_double), ndpointer("double", ndim=2, flags="F"), ndpointer("double", ndim=2, flags="F"), POINTER(c_int), POINTER(c_double), POINTER(c_double), POINTER(c_double), ndpointer("double", ndim=1, flags="F,writeable"), ] libloss.BCloss_recov_dist.restype = None libloss.BCloss_recov_dist.argtypes = [ ndpointer("double", ndim=2, flags="F"), # defaultprob POINTER(c_int), # nrow(defaultprob) POINTER(c_int), # ncol(defaultprob) ndpointer("double", ndim=1, flags="F"), # issuerweights ndpointer("double", ndim=1, flags="F"), # recovery ndpointer("double", ndim=1, flags="F"), # Z ndpointer("double", ndim=1, flags="F"), # w POINTER(c_int), # len(Z) = len(w) ndpointer("double", ndim=1, flags="F"), # rho POINTER(c_int), # Ngrid POINTER(c_int), # defaultflag ndpointer("double", ndim=2, flags="F,writeable"), # output L ndpointer("double", ndim=2, flags="F,writeable"), # output R ] libloss.BCloss_recov_trunc.restype = None libloss.BCloss_recov_trunc.argtypes = [ ndpointer("double", ndim=2, flags="F"), # defaultprob POINTER(c_int), # nrow(defaultprob) POINTER(c_int), # ncol(defaultprob) ndpointer("double", ndim=1, flags="F"), # issuerweights ndpointer("double", ndim=1, flags="F"), # recovery ndpointer("double", ndim=1, flags="F"), # Z ndpointer("double", ndim=1, flags="F"), # w POINTER(c_int), # len(Z) = len(w) ndpointer("double", ndim=1, flags="F"), # rho POINTER(c_int), # Ngrid POINTER(c_double), # K POINTER(c_int), # defaultflag ndpointer("double", ndim=1, flags="F,writeable"), # output EL ndpointer("double", ndim=1, flags="F,writeable"), # output ER ] libloss.lossdistrib_joint.restype = None libloss.lossdistrib_joint.argtypes = [ ndpointer("double", ndim=1, flags="F"), wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), ndpointer("double", ndim=1, flags="F"), ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.lossdistrib_joint_Z.restype = None libloss.lossdistrib_joint_Z.argtypes = [ ndpointer("double", ndim=1, flags="F"), wrapped_ndpointer("double", ndim=1, flags="F"), POINTER(c_int), ndpointer("double", ndim=1, flags="F"), ndpointer("double", ndim=1, flags="F"), POINTER(c_int), POINTER(c_int), ndpointer("double", ndim=1, flags="F"), ndpointer("double", ndim=1, flags="F"), ndpointer("double", ndim=1, flags="F"), POINTER(c_int), ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.joint_default_averagerecov_distrib.restype = None libloss.joint_default_averagerecov_distrib.argtypes = [ ndpointer("double", ndim=1, flags="F"), POINTER(c_int), ndpointer("double", ndim=1, flags="F"), POINTER(c_int), ndpointer("double", ndim=2, flags="F,writeable"), ] libloss.shockprob.restype = c_double libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int] libloss.shockseverity.restype = c_double libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double] def GHquad(n): Z, w = h_roots(n) return Z * np.sqrt(2), w / np.sqrt(np.pi) def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): q = np.zeros_like(Z) libloss.stochasticrecov( byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q, ) return q def fitprob(Z, w, rho, p0): result = np.empty_like(Z) libloss.fitprob( Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result ) return result def shockprob(p, rho, Z, give_log): return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log)) def shockseverity(S, rho, Z, p): return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p)) def BCloss_recov_dist( defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False ): L = np.zeros((Ngrid, defaultprob.shape[1]), order="F") R = np.zeros_like(L) rho = np.full(issuerweights.size, rho) libloss.BCloss_recov_dist( defaultprob, byref(c_int(defaultprob.shape[0])), byref(c_int(defaultprob.shape[1])), issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R, ) return L, R def BCloss_recov_trunc( defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False ): ELt = np.zeros(defaultprob.shape[1]) ERt = np.zeros_like(ELt) rho = np.full(issuerweights.size, rho) libloss.BCloss_recov_trunc( defaultprob, byref(c_int(defaultprob.shape[0])), byref(c_int(defaultprob.shape[1])), issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, byref(c_int(Ngrid)), byref(c_double(K)), byref(c_int(defaultflag)), ELt, ERt, ) return ELt, ERt def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): """Joint loss-recovery distribution recursive algorithm. This computes the joint loss/recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. pp : (N,) array_like or None Vector of prepayments. w : (N,) array_like Issuer weights. S : (N,) array_like Vector of severities. Ngrid : integer, optional Number of ticks on the grid, default 101. defaultflag : bool, optional If True computes the default distribution instead. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal """ q = np.zeros((Ngrid, Ngrid), order="F") if pp is not None: assert p.shape == pp.shape assert w.shape == S.shape libloss.lossdistrib_joint( p, pp, byref(c_int(p.shape[0])), w, S, byref(c_int(Ngrid)), byref(c_int(defaultflag)), q, ) return q def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): """Joint loss-recovery distribution recursive algorithm. This computes the joint loss/recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. pp : (N,) array_like or None Vector of prepayments. w : (N,) array_like Issuer weights. S : (N,) array_like Vector of severities. rho : float Correlation. Ngrid : integer, optional Number of ticks on the grid, default 101. defaultflag : bool, optional If True computes the default distribution instead. nZ : int, optional Size of stochastic factor. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal Examples -------- >>> import numpy as np >>> p = np.random.rand(100) >>> pp = np.zeros(100) >>> w = 1/100 * np.ones(100) >>> S = np.random.rand(100) >>> q = lossdistrib_joint_Z(p, pp, S, 0.5) """ Z, wZ = GHquad(nZ) q = np.zeros((Ngrid, Ngrid), order="F") rho = rho * np.ones(p.shape[0]) if pp is not None: assert p.shape == pp.shape assert w.shape == S.shape libloss.lossdistrib_joint_Z( p, pp, byref(c_int(p.shape[0])), w, S, byref(c_int(Ngrid)), byref(c_int(defaultflag)), rho, Z, wZ, byref(c_int(nZ)), q, ) return q def joint_default_averagerecov_distrib(p, S, Ngrid=101): """Joint defaut-average recovery distribution recursive algorithm. This computes the joint default/average recovery distribution using a first order correction. Parameters ---------- p : (N,) array_like Vector of default probabilities. S : (N,) array_like Vector of severities. Ngrid : integer, optional Number of ticks on the grid, default 101. Returns ------- q : (N, N) ndarray Notes ----- np.sum(q, axis=0) is the recovery distribution marginal np.sum(q, axis=1) is the loss (or default) distribution marginal """ q = np.zeros((Ngrid, p.shape[0] + 1), order="F") assert p.shape == S.shape libloss.joint_default_averagerecov_distrib( p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q ) return q.T def adjust_attachments(K, losstodate, factor): """ computes the attachments adjusted for losses on current notional """ return np.minimum(np.maximum((K - losstodate) / factor, 0), 1) def trancheloss(L, K1, K2): return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) def trancherecov(R, K1, K2): return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) def tranche_cl(L, R, cs, K1, K2, scaled=False): if K1 == K2: return 0 else: support = np.linspace(0, 1, L.shape[0]) size = ( K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - np.dot(trancherecov(support, K1, K2), R) ) sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1]))) if scaled: return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): if K1 == K2: return 0.0 else: size = EL - ER dK = K2 - K1 sizeadj = 0.5 * (size + np.hstack((dK, size[:-1]))) if scaled: return 1 / dK * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) def tranche_pl(L, cs, K1, K2, scaled=False): if K1 == K2: return 0 else: dK = K2 - K1 support = np.linspace(0, 1, L.shape[0]) cf = dK - np.dot(trancheloss(support, K1, K2), L) cf = np.hstack((dK, cf)) if scaled: return 1 / dK * np.dot(np.diff(cf), cs["df"]) else: return np.dot(np.diff(cf), cs["df"]) def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): if K1 == K2: return 0 else: dK = K2 - K1 cf = np.hstack((dK, EL)) if scaled: return 1 / dK * np.dot(np.diff(cf), cs["df"]) else: return np.dot(np.diff(cf), cs["df"]) def tranche_pv(L, R, cs, K1, K2): return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) def credit_schedule(tradedate, tenor, coupon, yc, enddate=None): tradedate = pydate_to_qldate(tradedate) if enddate is None: enddate = tradedate + Period(tenor) else: enddate = pydate_to_qldate(enddate) cal = WeekendsOnly() DC = Actual360() start_date = tradedate + 1 sched = Schedule.from_rule( tradedate, enddate, Period("3M"), cal, ModifiedFollowing, Unadjusted, CDS2015 ) dates = sched.to_npdates() pydates = dates.astype("O") df = [yc.discount_factor(d) for d in pydates if d > start_date] coupons = [ DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1]) if d2 > start_date ] coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) if dates[1] <= start_date: dates = dates[2:] else: dates = dates[1:] return pd.DataFrame({"df": df, "coupons": coupons}, index=dates) def cds_accrued(tradedate, coupon): """ computes accrued for a standard CDS TODO: fix for when trade_date + 1 = IMM date""" tradedate = pydate_to_qldate(tradedate) end = tradedate + Period("3M") start_protection = tradedate + 1 DC = Actual360() cal = WeekendsOnly() sched = Schedule.from_rule( tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015 ) prevpaydate = sched.previous_date(start_protection) return DC.year_fraction(prevpaydate, start_protection) * coupon def dist_transform(q): """computes the joint (D, R) distribution from the (L, R) distribution using D = L+R """ Ngrid = q.shape[0] distDR = np.zeros_like(q) for i in range(Ngrid): for j in range(Ngrid): index = i + j if index < Ngrid: distDR[index, j] += q[i, j] else: distDR[Ngrid - 1, j] += q[i, j] return distDR def dist_transform2(q): """computes the joint (D, R/D) distribution from the (D, R) distribution """ Ngrid = q.shape[0] distDR = np.empty(Ngrid, dtype="object") for i in range(Ngrid): distDR[i] = {} for i in range(1, Ngrid): for j in range(i + 1): index = j / i distDR[i][index] = distDR[i].get(index, 0) + q[i, j] return distDR def compute_pv(q, strike): r""" compute E(1_{R^\bar \leq strike} * D)""" for i in range(q.shape): val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid return val def average_recov(p, R, Ngrid): q = np.zeros((p.shape[0] + 1, Ngrid)) q[0, 0] = 1 lu = 1 / (Ngrid - 1) weights = np.empty(Ngrid) index = np.empty(Ngrid) grid = np.linspace(0, 1, Ngrid) for i, prob in enumerate(p): for j in range(i + 1, 0, -1): newrecov = ((j - 1) * grid + R[i]) / j np.modf(newrecov * (Ngrid - 1), weights, index) q[j] *= 1 - prob for k in range(Ngrid): q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k] q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k] q[0] *= 1 - prob return q if __name__ == "__main__": # n_issuers = 100 # p = np.random.rand(n_issuers) # pp = np.random.rand(n_issuers) # w = 1/n_issuers * np.ones(n_issuers) # S = np.random.rand(n_issuers) # rho = 0.5 # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) import numpy as np n_issuers = 100 p = np.random.rand(n_issuers) R = np.random.rand(n_issuers) Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001) Rbar_slow = average_recov(p, R, 1001)