aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_functions.py')
-rw-r--r--python/analytics/tranche_functions.py608
1 files changed, 0 insertions, 608 deletions
diff --git a/python/analytics/tranche_functions.py b/python/analytics/tranche_functions.py
deleted file mode 100644
index 1b3792b7..00000000
--- a/python/analytics/tranche_functions.py
+++ /dev/null
@@ -1,608 +0,0 @@
-import numpy as np
-from ctypes import POINTER, c_int, c_double, byref
-from numpy.ctypeslib import ndpointer
-from pathlib import Path
-from pyisda.legs import FeeLeg
-from pyisda.date import previous_twentieth
-from quantlib.time.schedule import Schedule, CDS2015, OldCDS
-from quantlib.time.api import (
- Actual360,
- Date,
- Period,
- WeekendsOnly,
- ModifiedFollowing,
- Unadjusted,
- pydate_from_qldate,
-)
-import pandas as pd
-from scipy.special import h_roots
-from .utils import next_twentieth
-
-
-def wrapped_ndpointer(*args, **kwargs):
- base = ndpointer(*args, **kwargs)
-
- def from_param(cls, obj):
- if obj is None:
- return obj
- return base.from_param(obj)
-
- return type(base.__name__, (base,), {"from_param": classmethod(from_param)})
-
-
-libloss = np.ctypeslib.load_library("lossdistrib", Path(__file__).parent)
-
-libloss.fitprob.restype = None
-libloss.fitprob.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=1, flags="F,writeable"),
-]
-libloss.stochasticrecov.restype = None
-libloss.stochasticrecov.argtypes = [
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=2, flags="F"),
- ndpointer("double", ndim=2, flags="F"),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer("double", ndim=1, flags="F,writeable"),
-]
-libloss.BCloss_recov_dist.restype = None
-libloss.BCloss_recov_dist.argtypes = [
- ndpointer("double", ndim=2, flags="F"), # defaultprob
- POINTER(c_int), # nrow(defaultprob)
- POINTER(c_int), # ncol(defaultprob)
- ndpointer("double", ndim=1, flags="F"), # issuerweights
- ndpointer("double", ndim=1, flags="F"), # recovery
- ndpointer("double", ndim=1, flags="F"), # Z
- ndpointer("double", ndim=1, flags="F"), # w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer("double", ndim=1, flags="F"), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_int), # defaultflag
- ndpointer("double", ndim=2, flags="F,writeable"), # output L
- ndpointer("double", ndim=2, flags="F,writeable"), # output R
-]
-libloss.BCloss_recov_trunc.restype = None
-libloss.BCloss_recov_trunc.argtypes = [
- ndpointer("double", ndim=2, flags="F"), # defaultprob
- POINTER(c_int), # nrow(defaultprob)
- POINTER(c_int), # ncol(defaultprob)
- ndpointer("double", ndim=1, flags="F"), # issuerweights
- ndpointer("double", ndim=1, flags="F"), # recovery
- ndpointer("double", ndim=1, flags="F"), # Z
- ndpointer("double", ndim=1, flags="F"), # w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer("double", ndim=1, flags="F"), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_double), # K
- POINTER(c_int), # defaultflag
- ndpointer("double", ndim=1, flags="F,writeable"), # output EL
- ndpointer("double", ndim=1, flags="F,writeable"), # output ER
-]
-
-libloss.lossdistrib_joint.restype = None
-libloss.lossdistrib_joint.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- wrapped_ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.lossdistrib_joint_Z.restype = None
-libloss.lossdistrib_joint_Z.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- wrapped_ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.joint_default_averagerecov_distrib.restype = None
-libloss.joint_default_averagerecov_distrib.argtypes = [
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=1, flags="F"),
- POINTER(c_int),
- ndpointer("double", ndim=2, flags="F,writeable"),
-]
-
-libloss.shockprob.restype = c_double
-libloss.shockprob.argtypes = [c_double, c_double, c_double, c_int]
-
-libloss.shockseverity.restype = c_double
-libloss.shockseverity.argtypes = [c_double, c_double, c_double, c_double]
-
-
-def GHquad(n):
- Z, w = h_roots(n)
- return Z * np.sqrt(2), w / np.sqrt(np.pi)
-
-
-def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod):
- q = np.zeros_like(Z)
- libloss.stochasticrecov(
- byref(c_double(R)),
- byref(c_double(Rtilde)),
- Z,
- w,
- byref(c_int(Z.size)),
- byref(c_double(rho)),
- byref(c_double(porig)),
- byref(c_double(pmod)),
- q,
- )
- return q
-
-
-def fitprob(Z, w, rho, p0):
- result = np.empty_like(Z)
- libloss.fitprob(
- Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result
- )
- return result
-
-
-def shockprob(p, rho, Z, give_log):
- return libloss.shockprob(c_double(p), c_double(rho), c_double(Z), c_int(give_log))
-
-
-def shockseverity(S, rho, Z, p):
- return libloss.shockseverity(c_double(S), c_double(rho), c_double(Z), c_double(p))
-
-
-def BCloss_recov_dist(
- defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False
-):
- L = np.zeros((Ngrid, defaultprob.shape[1]), order="F")
- R = np.zeros_like(L)
- if rho > 1.0:
- rho = np.ones(issuerweights.size)
- else:
- rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_dist(
- defaultprob,
- byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights,
- recov,
- Z,
- w,
- byref(c_int(Z.size)),
- rho,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- L,
- R,
- )
- return L, R
-
-
-def BCloss_recov_trunc(
- defaultprob, issuerweights, recov, rho, K, Z, w, Ngrid=101, defaultflag=False
-):
- ELt = np.zeros(defaultprob.shape[1])
- ERt = np.zeros_like(ELt)
- if rho > 1.0:
- rho = np.ones(issuerweights.size)
- else:
- rho = np.full(issuerweights.size, rho)
- libloss.BCloss_recov_trunc(
- defaultprob,
- byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights,
- recov,
- Z,
- w,
- byref(c_int(Z.size)),
- rho,
- byref(c_int(Ngrid)),
- byref(c_double(K)),
- byref(c_int(defaultflag)),
- ELt,
- ERt,
- )
- return ELt, ERt
-
-
-def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, Ngrid), order="F")
- if pp is not None:
- assert p.shape == pp.shape
- assert w.shape == S.shape
- libloss.lossdistrib_joint(
- p,
- pp,
- byref(c_int(p.shape[0])),
- w,
- S,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- q,
- )
- return q
-
-
-def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- rho : float
- Correlation.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
- nZ : int, optional
- Size of stochastic factor.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
-
- Examples
- --------
- >>> import numpy as np
- >>> p = np.random.rand(100)
- >>> pp = np.zeros(100)
- >>> w = 1/100 * np.ones(100)
- >>> S = np.random.rand(100)
- >>> q = lossdistrib_joint_Z(p, pp, S, 0.5)
-
- """
- Z, wZ = GHquad(nZ)
- q = np.zeros((Ngrid, Ngrid), order="F")
- rho = rho * np.ones(p.shape[0])
- if pp is not None:
- assert p.shape == pp.shape
- assert w.shape == S.shape
-
- libloss.lossdistrib_joint_Z(
- p,
- pp,
- byref(c_int(p.shape[0])),
- w,
- S,
- byref(c_int(Ngrid)),
- byref(c_int(defaultflag)),
- rho,
- Z,
- wZ,
- byref(c_int(nZ)),
- q,
- )
- return q
-
-
-def joint_default_averagerecov_distrib(p, S, Ngrid=101):
- """Joint defaut-average recovery distribution recursive algorithm.
-
- This computes the joint default/average recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, p.shape[0] + 1), order="F")
- assert p.shape == S.shape
- libloss.joint_default_averagerecov_distrib(
- p, byref(c_int(p.shape[0])), S, byref(c_int(Ngrid)), q
- )
- return q.T
-
-
-def adjust_attachments(K, losstodate, factor):
- """
- computes the attachments adjusted for losses
- on current notional
- """
- return np.minimum(np.maximum((K - losstodate) / factor, 0), 1)
-
-
-def trancheloss(L, K1, K2):
- return np.maximum(L - K1, 0) - np.maximum(L - K2, 0)
-
-
-def trancherecov(R, K1, K2):
- return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0)
-
-
-def tranche_cl(L, R, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- support = np.linspace(0, 1, L.shape[0])
- size = (
- K2
- - K1
- - np.dot(trancheloss(support, K1, K2), L)
- - np.dot(trancherecov(support, K1, K2), R)
- )
- sizeadj = 0.5 * (size + np.hstack((K2 - K1, size[:-1])))
- if scaled:
- return 1 / (K2 - K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
- else:
- return np.dot(sizeadj * cs["coupons"], cs["df"])
-
-
-def tranche_cl_trunc(EL, ER, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0.0
- else:
- size = EL - ER
- dK = K2 - K1
- sizeadj = 0.5 * (size + np.hstack((dK, size[:-1])))
- if scaled:
- return 1 / dK * np.dot(sizeadj * cs["coupons"], cs["df"])
- else:
- return np.dot(sizeadj * cs["coupons"], cs["df"])
-
-
-def tranche_pl(L, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- dK = K2 - K1
- support = np.linspace(0, 1, L.shape[0])
- cf = dK - np.dot(trancheloss(support, K1, K2), L)
- cf = np.hstack((dK, cf))
- if scaled:
- return 1 / dK * np.dot(np.diff(cf), cs["df"])
- else:
- return np.dot(np.diff(cf), cs["df"])
-
-
-def tranche_pl_trunc(EL, cs, K1, K2, scaled=False):
- if K1 == K2:
- return 0
- else:
- dK = K2 - K1
- cf = np.hstack((dK, EL))
- if scaled:
- return 1 / dK * np.dot(np.diff(cf), cs["df"])
- else:
- return np.dot(np.diff(cf), cs["df"])
-
-
-def tranche_pv(L, R, cs, K1, K2):
- return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2)
-
-
-def credit_schedule(tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015):
- tradedate = Date.from_datetime(tradedate)
- if enddate is None:
- enddate = tradedate + Period(tenor)
- else:
- enddate = Date.from_datetime(enddate)
- cal = WeekendsOnly()
- DC = Actual360()
- start_date = tradedate + 1
- # if start_date falls on a week-end and is an imm date, our schedule is too short
- # so use trade_date instead
- sched = Schedule.from_rule(
- tradedate if rule == CDS2015 else start_date,
- enddate,
- Period("3M"),
- cal,
- ModifiedFollowing,
- Unadjusted,
- rule,
- )
- if sched[1] == start_date: # we need to skip one date
- sched = sched.after(start_date)
- payment_dates = [pydate_from_qldate(cal.adjust(d)) for d in sched if d > start_date]
- df = [yc.discount_factor(d) for d in payment_dates]
- coupons = [
- DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])
- ]
- coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon)
- dates = sched.to_npdates()
- start_dates = dates[:-1]
- end_dates = dates[1:]
- return pd.DataFrame(
- {
- "df": df,
- "coupons": coupons,
- "start_dates": start_dates,
- "payment_dates": payment_dates,
- },
- index=end_dates,
- )
-
-
-def credit_schedule_pyisda(
- tradedate, coupon, yc, enddate=None, tenor=None, rule=CDS2015
-):
- tradedate = Date.from_datetime(tradedate)
- if enddate is None:
- enddate = tradedate + Period(tenor)
- else:
- enddate = Date.from_datetime(enddate)
- start_date = pydate_from_qldate(tradedate + 1)
- if (next_twentieth(start_date) - start_date).days < 30:
- stub = "f/l"
- else:
- stub = "f/s"
- if rule is CDS2015:
- start_date = previous_twentieth(pydate_from_qldate(tradedate))
- fl = FeeLeg(start_date, pydate_from_qldate(enddate), True, 1.0, coupon, stub=stub)
- df = pd.DataFrame({"coupons": [t[1] for t in fl.cashflows], **fl.inspect()})
- df["df"] = [yc.discount_factor(d) for d in df.pay_dates]
-
- df = df.rename(
- columns={"acc_start_dates": "start_dates", "pay_dates": "payment_dates"}
- ).set_index("acc_end_dates")
- return df
-
-
-def cds_accrued(tradedate, coupon):
- """computes accrued for a standard CDS
-
- TODO: fix for when trade_date + 1 = IMM date"""
- tradedate = Date.from_datetime(tradedate)
- end = tradedate + Period("3M")
-
- start_protection = tradedate + 1
- DC = Actual360()
- cal = WeekendsOnly()
- sched = Schedule.from_rule(
- tradedate, end, Period("3M"), cal, date_generation_rule=CDS2015
- )
- prevpaydate = sched.previous_date(start_protection)
- return DC.year_fraction(prevpaydate, start_protection) * coupon
-
-
-def dist_transform(q):
- """computes the joint (D, R) distribution
- from the (L, R) distribution using D = L+R
- """
- Ngrid = q.shape[0]
- distDR = np.zeros_like(q)
- for i in range(Ngrid):
- for j in range(Ngrid):
- index = i + j
- if index < Ngrid:
- distDR[index, j] += q[i, j]
- else:
- distDR[Ngrid - 1, j] += q[i, j]
- return distDR
-
-
-def dist_transform2(q):
- """computes the joint (D, R/D) distribution
- from the (D, R) distribution
- """
- Ngrid = q.shape[0]
- distDR = np.empty(Ngrid, dtype="object")
- for i in range(Ngrid):
- distDR[i] = {}
- for i in range(1, Ngrid):
- for j in range(i + 1):
- index = j / i
- distDR[i][index] = distDR[i].get(index, 0) + q[i, j]
- return distDR
-
-
-def compute_pv(q, strike):
- r""" compute E(1_{R^\bar \leq strike} * D)"""
- for i in range(q.shape):
- val += sum(v for k, v in q[i].items() if k < strike) * 1 / Ngrid
- return val
-
-
-def average_recov(p, R, Ngrid):
- q = np.zeros((p.shape[0] + 1, Ngrid))
- q[0, 0] = 1
- lu = 1 / (Ngrid - 1)
- weights = np.empty(Ngrid)
- index = np.empty(Ngrid)
- grid = np.linspace(0, 1, Ngrid)
- for i, prob in enumerate(p):
- for j in range(i + 1, 0, -1):
- newrecov = ((j - 1) * grid + R[i]) / j
- np.modf(newrecov * (Ngrid - 1), weights, index)
- q[j] *= 1 - prob
- for k in range(Ngrid):
- q[j, int(index[k]) + 1] += weights[k] * prob * q[j - 1, k]
- q[j, int(index[k])] += (1 - weights[k]) * prob * q[j - 1, k]
- q[0] *= 1 - prob
- return q
-
-
-if __name__ == "__main__":
- # n_issuers = 100
- # p = np.random.rand(n_issuers)
- # pp = np.random.rand(n_issuers)
- # w = 1/n_issuers * np.ones(n_issuers)
- # S = np.random.rand(n_issuers)
- # rho = 0.5
- # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
- # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)
- import numpy as np
-
- n_issuers = 100
- p = np.random.rand(n_issuers)
- R = np.random.rand(n_issuers)
- Rbar = joint_default_averagerecov_distrib(p, 1 - R, 1001)
- Rbar_slow = average_recov(p, R, 1001)