diff options
Diffstat (limited to 'python/analytics/tranche_functions.py')
| -rw-r--r-- | python/analytics/tranche_functions.py | 608 |
1 files changed, 0 insertions, 608 deletions
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py deleted file mode 100644 index 1b3792b7..00000000 --- a/python/analytics/tranche_functions.py +++ /dev/null @@ -1,608 +0,0 @@ -import numpy as np -from ctypes import POINTER, c_int, c_double, byref -from numpy.ctypeslib import ndpointer -from pathlib import Path -from pyisda.legs import FeeLeg -from pyisda.date import previous_twentieth -from quantlib.time.schedule import Schedule, CDS2015, OldCDS -from quantlib.time.api import ( - Actual360, - Date, - Period, - WeekendsOnly, - ModifiedFollowing, - Unadjusted, - pydate_from_qldate, -) -import pandas as pd -from scipy.special import h_roots -from .utils import next_twentieth - - -def wrapped_ndpointer(*args, **kwargs): - base = ndpointer(*args, **kwargs) - - def from_param(cls, obj): - if obj is None: - return obj - return base.from_param(obj) - - return type(base.__name__, (base,), {"from_param": classmethod(from_param)}) - - -libloss = np.ctypeslib.load_library("lossdistrib", Path(__file__).parent) - -libloss.fitprob.restype = None -libloss.fitprob.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=1, flags="F,writeable"), -] -libloss.stochasticrecov.restype = None -libloss.stochasticrecov.argtypes = [ - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=2, flags="F"), - ndpointer("double", ndim=2, flags="F"), - POINTER(c_int), - POINTER(c_double), - POINTER(c_double), - POINTER(c_double), - ndpointer("double", ndim=1, flags="F,writeable"), -] -libloss.BCloss_recov_dist.restype = None -libloss.BCloss_recov_dist.argtypes = [ - ndpointer("double", ndim=2, flags="F"), # defaultprob - POINTER(c_int), # nrow(defaultprob) - POINTER(c_int), # ncol(defaultprob) - ndpointer("double", ndim=1, flags="F"), # issuerweights - ndpointer("double", ndim=1, flags="F"), # recovery - ndpointer("double", ndim=1, flags="F"), # Z - ndpointer("double", ndim=1, flags="F"), # w - POINTER(c_int), # len(Z) = len(w) - ndpointer("double", ndim=1, flags="F"), # rho - POINTER(c_int), # Ngrid - POINTER(c_int), # defaultflag - ndpointer("double", ndim=2, flags="F,writeable"), # output L - ndpointer("double", ndim=2, flags="F,writeable"), # output R -] -libloss.BCloss_recov_trunc.restype = None -libloss.BCloss_recov_trunc.argtypes = [ - ndpointer("double", ndim=2, flags="F"), # defaultprob - POINTER(c_int), # nrow(defaultprob) - POINTER(c_int), # ncol(defaultprob) - ndpointer("double", ndim=1, flags="F"), # issuerweights - ndpointer("double", ndim=1, flags="F"), # recovery - ndpointer("double", ndim=1, flags="F"), # Z - ndpointer("double", ndim=1, flags="F"), # w - POINTER(c_int), # len(Z) = len(w) - ndpointer("double", ndim=1, flags="F"), # rho - POINTER(c_int), # Ngrid - POINTER(c_double), # K - POINTER(c_int), # defaultflag - ndpointer("double", ndim=1, flags="F,writeable"), # output EL - ndpointer("double", ndim=1, flags="F,writeable"), # output ER -] - -libloss.lossdistrib_joint.restype = None -libloss.lossdistrib_joint.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - wrapped_ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.lossdistrib_joint_Z.restype = None -libloss.lossdistrib_joint_Z.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - wrapped_ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.joint_default_averagerecov_distrib.restype = None -libloss.joint_default_averagerecov_distrib.argtypes = [ - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=1, flags="F"), - POINTER(c_int), - ndpointer("double", ndim=2, flags="F,writeable"), -] - -libloss.shockprob.restype = c_double -libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int] - -libloss.shockseverity.restype = c_double -libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double] - - -def GHquad(n): - Z, w = h_roots(n) - return Z * np.sqrt(2), w / np.sqrt(np.pi) - - -def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): - q = np.zeros_like(Z) - libloss.stochasticrecov( - byref(c_double(R)), - byref(c_double(Rtilde)), - Z, - w, - byref(c_int(Z.size)), - byref(c_double(rho)), - byref(c_double(porig)), - byref(c_double(pmod)), - q, - ) - return q - - -def fitprob(Z, w, rho, p0): - result = np.empty_like(Z) - libloss.fitprob( - Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result - ) - return result - - -def shockprob(p, rho, Z, give_log): - return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log)) - - -def shockseverity(S, rho, Z, p): - return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p)) - - -def BCloss_recov_dist( - defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False -): - L = np.zeros((Ngrid, defaultprob.shape[1]), order="F") - R = np.zeros_like(L) - if rho > 1.0: - rho = np.ones(issuerweights.size) - else: - rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_dist( - defaultprob, - byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, - recov, - Z, - w, - byref(c_int(Z.size)), - rho, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - L, - R, - ) - return L, R - - -def BCloss_recov_trunc( - defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False -): - ELt = np.zeros(defaultprob.shape[1]) - ERt = np.zeros_like(ELt) - if rho > 1.0: - rho = np.ones(issuerweights.size) - else: - rho = np.full(issuerweights.size, rho) - libloss.BCloss_recov_trunc( - defaultprob, - byref(c_int(defaultprob.shape[0])), - byref(c_int(defaultprob.shape[1])), - issuerweights, - recov, - Z, - w, - byref(c_int(Z.size)), - rho, - byref(c_int(Ngrid)), - byref(c_double(K)), - byref(c_int(defaultflag)), - ELt, - ERt, - ) - return ELt, ERt - - -def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, Ngrid), order="F") - if pp is not None: - assert p.shape == pp.shape - assert w.shape == S.shape - libloss.lossdistrib_joint( - p, - pp, - byref(c_int(p.shape[0])), - w, - S, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - q, - ) - return q - - -def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500): - """Joint loss-recovery distribution recursive algorithm. - - This computes the joint loss/recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - pp : (N,) array_like or None - Vector of prepayments. - w : (N,) array_like - Issuer weights. - S : (N,) array_like - Vector of severities. - rho : float - Correlation. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - defaultflag : bool, optional - If True computes the default distribution instead. - nZ : int, optional - Size of stochastic factor. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - - Examples - -------- - >>> import numpy as np - >>> p = np.random.rand(100) - >>> pp = np.zeros(100) - >>> w = 1/100 * np.ones(100) - >>> S = np.random.rand(100) - >>> q = lossdistrib_joint_Z(p, pp, S, 0.5) - - """ - Z, wZ = GHquad(nZ) - q = np.zeros((Ngrid, Ngrid), order="F") - rho = rho * np.ones(p.shape[0]) - if pp is not None: - assert p.shape == pp.shape - assert w.shape == S.shape - - libloss.lossdistrib_joint_Z( - p, - pp, - byref(c_int(p.shape[0])), - w, - S, - byref(c_int(Ngrid)), - byref(c_int(defaultflag)), - rho, - Z, - wZ, - byref(c_int(nZ)), - q, - ) - return q - - -def joint_default_averagerecov_distrib(p, S, Ngrid=101): - """Joint defaut-average recovery distribution recursive algorithm. - - This computes the joint default/average recovery distribution using a first order - correction. - - Parameters - ---------- - p : (N,) array_like - Vector of default probabilities. - S : (N,) array_like - Vector of severities. - Ngrid : integer, optional - Number of ticks on the grid, default 101. - - Returns - ------- - q : (N, N) ndarray - - Notes - ----- - np.sum(q, axis=0) is the recovery distribution marginal - np.sum(q, axis=1) is the loss (or default) distribution marginal - """ - - q = np.zeros((Ngrid, p.shape[0] + 1), order="F") - assert p.shape == S.shape - libloss.joint_default_averagerecov_distrib( - p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q - ) - return q.T - - -def adjust_attachments(K, losstodate, factor): - """ - computes the attachments adjusted for losses - on current notional - """ - return np.minimum(np.maximum((K - losstodate) / factor, 0), 1) - - -def trancheloss(L, K1, K2): - return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) - - -def trancherecov(R, K1, K2): - return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) - - -def tranche_cl(L, R, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - support = np.linspace(0, 1, L.shape[0]) - size = ( - K2 - - K1 - - np.dot(trancheloss(support, K1, K2), L) - - np.dot(trancherecov(support, K1, K2), R) - ) - sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1]))) - if scaled: - return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) - else: - return np.dot(sizeadj * cs["coupons"], cs["df"]) - - -def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False): - if K1 == K2: - return 0.0 - else: - size = EL - ER - dK = K2 - K1 - sizeadj = 0.5 * (size + np.hstack((dK, size[:-1]))) - if scaled: - return 1 / dK * np.dot(sizeadj * cs["coupons"], cs["df"]) - else: - return np.dot(sizeadj * cs["coupons"], cs["df"]) - - -def tranche_pl(L, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - dK = K2 - K1 - support = np.linspace(0, 1, L.shape[0]) - cf = dK - np.dot(trancheloss(support, K1, K2), L) - cf = np.hstack((dK, cf)) - if scaled: - return 1 / dK * np.dot(np.diff(cf), cs["df"]) - else: - return np.dot(np.diff(cf), cs["df"]) - - -def tranche_pl_trunc(EL, cs, K1, K2, scaled=False): - if K1 == K2: - return 0 - else: - dK = K2 - K1 - cf = np.hstack((dK, EL)) - if scaled: - return 1 / dK * np.dot(np.diff(cf), cs["df"]) - else: - return np.dot(np.diff(cf), cs["df"]) - - -def tranche_pv(L, R, cs, K1, K2): - return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) - - -def credit_schedule(tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015): - tradedate = Date.from_datetime(tradedate) - if enddate is None: - enddate = tradedate + Period(tenor) - else: - enddate = Date.from_datetime(enddate) - cal = WeekendsOnly() - DC = Actual360() - start_date = tradedate + 1 - # if start_date falls on a week-end and is an imm date, our schedule is too short - # so use trade_date instead - sched = Schedule.from_rule( - tradedate if rule == CDS2015 else start_date, - enddate, - Period("3M"), - cal, - ModifiedFollowing, - Unadjusted, - rule, - ) - if sched[1] == start_date: # we need to skip one date - sched = sched.after(start_date) - payment_dates = [pydate_from_qldate(cal.adjust(d)) for d in sched if d > start_date] - df = [yc.discount_factor(d) for d in payment_dates] - coupons = [ - DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1]) - ] - coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon) - dates = sched.to_npdates() - start_dates = dates[:-1] - end_dates = dates[1:] - return pd.DataFrame( - { - "df": df, - "coupons": coupons, - "start_dates": start_dates, - "payment_dates": payment_dates, - }, - index=end_dates, - ) - - -def credit_schedule_pyisda( - tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015 -): - tradedate = Date.from_datetime(tradedate) - if enddate is None: - enddate = tradedate + Period(tenor) - else: - enddate = Date.from_datetime(enddate) - start_date = pydate_from_qldate(tradedate + 1) - if (next_twentieth(start_date) - start_date).days < 30: - stub = "f/l" - else: - stub = "f/s" - if rule is CDS2015: - start_date = previous_twentieth(pydate_from_qldate(tradedate)) - fl = FeeLeg(start_date, pydate_from_qldate(enddate), True, 1.0, coupon, stub=stub) - df = pd.DataFrame({"coupons": [t[1] for t in fl.cashflows], **fl.inspect()}) - df["df"] = [yc.discount_factor(d) for d in df.pay_dates] - - df = df.rename( - columns={"acc_start_dates": "start_dates", "pay_dates": "payment_dates"} - ).set_index("acc_end_dates") - return df - - -def cds_accrued(tradedate, coupon): - """computes accrued for a standard CDS - - TODO: fix for when trade_date + 1 = IMM date""" - tradedate = Date.from_datetime(tradedate) - end = tradedate + Period("3M") - - start_protection = tradedate + 1 - DC = Actual360() - cal = WeekendsOnly() - sched = Schedule.from_rule( - tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015 - ) - prevpaydate = sched.previous_date(start_protection) - return DC.year_fraction(prevpaydate, start_protection) * coupon - - -def dist_transform(q): - """computes the joint (D, R) distribution - from the (L, R) distribution using D = L+R - """ - Ngrid = q.shape[0] - distDR = np.zeros_like(q) - for i in range(Ngrid): - for j in range(Ngrid): - index = i + j - if index < Ngrid: - distDR[index, j] += q[i, j] - else: - distDR[Ngrid - 1, j] += q[i, j] - return distDR - - -def dist_transform2(q): - """computes the joint (D, R/D) distribution - from the (D, R) distribution - """ - Ngrid = q.shape[0] - distDR = np.empty(Ngrid, dtype="object") - for i in range(Ngrid): - distDR[i] = {} - for i in range(1, Ngrid): - for j in range(i + 1): - index = j / i - distDR[i][index] = distDR[i].get(index, 0) + q[i, j] - return distDR - - -def compute_pv(q, strike): - r""" compute E(1_{R^\bar \leq strike} * D)""" - for i in range(q.shape): - val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid - return val - - -def average_recov(p, R, Ngrid): - q = np.zeros((p.shape[0] + 1, Ngrid)) - q[0, 0] = 1 - lu = 1 / (Ngrid - 1) - weights = np.empty(Ngrid) - index = np.empty(Ngrid) - grid = np.linspace(0, 1, Ngrid) - for i, prob in enumerate(p): - for j in range(i + 1, 0, -1): - newrecov = ((j - 1) * grid + R[i]) / j - np.modf(newrecov * (Ngrid - 1), weights, index) - q[j] *= 1 - prob - for k in range(Ngrid): - q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k] - q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k] - q[0] *= 1 - prob - return q - - -if __name__ == "__main__": - # n_issuers = 100 - # p = np.random.rand(n_issuers) - # pp = np.random.rand(n_issuers) - # w = 1/n_issuers * np.ones(n_issuers) - # S = np.random.rand(n_issuers) - # rho = 0.5 - # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True) - # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True) - import numpy as np - - n_issuers = 100 - p = np.random.rand(n_issuers) - R = np.random.rand(n_issuers) - Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001) - Rbar_slow = average_recov(p, R, 1001) |
