aboutsummaryrefslogtreecommitdiffstats
path: root/python/tranche_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tranche_functions.py')
-rw-r--r--python/tranche_functions.py384
1 files changed, 0 insertions, 384 deletions
diff --git a/python/tranche_functions.py b/python/tranche_functions.py
deleted file mode 100644
index a92044e3..00000000
--- a/python/tranche_functions.py
+++ /dev/null
@@ -1,384 +0,0 @@
-import numpy as np
-from ctypes import *
-from numpy.ctypeslib import ndpointer
-from quantlib.time.schedule import Schedule, CDS2015
-from quantlib.time.api import Actual360, Period, WeekendsOnly, ModifiedFollowing, Unadjusted, today
-from quantlib.util.converter import qldate_to_pydate, pydate_to_qldate
-import pandas as pd
-from scipy.special import h_roots
-import os
-
-def wrapped_ndpointer(*args, **kwargs):
- base = ndpointer(*args, **kwargs)
- def from_param(cls, obj):
- if obj is None:
- return obj
- return base.from_param(obj)
- return type(base.__name__, (base,), {'from_param': classmethod(from_param)})
-
-libloss = np.ctypeslib.load_library("lossdistrib", "../R/lossdistrib/src/")
-libloss.fitprob.restype = None
-libloss.fitprob.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
-libloss.stochasticrecov.restype = None
-libloss.stochasticrecov.argtypes = [
- POINTER(c_double),
- POINTER(c_double),
- ndpointer('double', ndim=2, flags='F'),
- ndpointer('double', ndim=2, flags='F'),
- POINTER(c_int),
- POINTER(c_double),
- POINTER(c_double),
- POINTER(c_double),
- ndpointer('double', ndim=1, flags='F,writeable')]
-libloss.BCloss_recov_dist.restype = None
-libloss.BCloss_recov_dist.argtypes = [
- ndpointer('double', ndim=2, flags='F'),# defaultprob
- POINTER(c_int),# nrow(defaultprob)
- POINTER(c_int),# ncol(defaultprob)
- ndpointer('double', ndim=1, flags='F'),# issuerweights
- ndpointer('double', ndim=1, flags='F'),# recovery
- ndpointer('double', ndim=1, flags='F'),# Z
- ndpointer('double', ndim=1, flags='F'),# w
- POINTER(c_int), # len(Z) = len(w)
- ndpointer('double', ndim=1, flags='F'), # rho
- POINTER(c_int), # Ngrid
- POINTER(c_int), #defaultflag
- ndpointer('double', ndim=2, flags='F,writeable'),# output L
- ndpointer('double', ndim=2, flags='F,writeable')# output R
-]
-
-libloss.lossdistrib_joint.restype = None
-libloss.lossdistrib_joint.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
-]
-
-libloss.lossdistrib_joint_Z.restype = None
-libloss.lossdistrib_joint_Z.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- wrapped_ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
-]
-
-libloss.joint_default_averagerecov_distrib.restype = None
-libloss.joint_default_averagerecov_distrib.argtypes = [
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- ndpointer('double', ndim=1, flags='F'),
- POINTER(c_int),
- ndpointer('double', ndim=2, flags='F,writeable')
-]
-
-def GHquad(n):
- Z, w = h_roots(n)
- return Z*np.sqrt(2), w/np.sqrt(np.pi)
-
-def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod):
- q = np.zeros_like(Z)
- libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)),
- byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q)
- return q
-
-def fitprob(Z, w, rho, p0):
- result = np.empty_like(Z)
- libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result)
- return result
-
-def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False):
- L = np.zeros((Ngrid, defaultprob.shape[1]), order='F')
- R = np.zeros_like(L)
- rho = np.repeat(rho, issuerweights.size)
- libloss.BCloss_recov_dist(defaultprob, byref(c_int(defaultprob.shape[0])),
- byref(c_int(defaultprob.shape[1])),
- issuerweights, recov, Z, w, byref(c_int(Z.size)), rho,
- byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R)
- return L, R
-
-def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, Ngrid), order='F')
- if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
- libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), q)
- return q
-
-def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
- """Joint loss-recovery distribution recursive algorithm.
-
- This computes the joint loss/recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- pp : (N,) array_like or None
- Vector of prepayments.
- w : (N,) array_like
- Issuer weights.
- S : (N,) array_like
- Vector of severities.
- rho : float
- Correlation.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
- defaultflag : bool, optional
- If True computes the default distribution instead.
- nZ : int, optional
- Size of stochastic factor.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
-
- Examples
- --------
- >>> import numpy as np
- >>> p = np.random.rand(100)
- >>> pp = np.zeros(100)
- >>> w = 1/100 * np.ones(100)
- >>> S = np.random.rand(100)
- >>> q = lossdistrib_joint_Z(p, pp, S, 0.5)
-
- """
- Z, wZ = GHquad(nZ)
- q = np.zeros((Ngrid, Ngrid), order='F')
- rho = rho * np.ones(p.shape[0])
- if pp is not None:
- assert(p.shape == pp.shape)
- assert(w.shape == S.shape)
-
- libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])),
- w, S, byref(c_int(Ngrid)),
- byref(c_int(defaultflag)), rho, Z, wZ,
- byref(c_int(nZ)), q)
- return q
-
-def joint_default_averagerecov_distrib(p, S, Ngrid=101):
- """Joint defaut-average recovery distribution recursive algorithm.
-
- This computes the joint default/average recovery distribution using a first order
- correction.
-
- Parameters
- ----------
- p : (N,) array_like
- Vector of default probabilities.
- S : (N,) array_like
- Vector of severities.
- Ngrid : integer, optional
- Number of ticks on the grid, default 101.
-
- Returns
- -------
- q : (N, N) ndarray
-
- Notes
- -----
- np.sum(q, axis=0) is the recovery distribution marginal
- np.sum(q, axis=1) is the loss (or default) distribution marginal
- """
-
- q = np.zeros((Ngrid, p.shape[0]+1), order='F')
- assert(p.shape == S.shape)
- libloss.joint_default_averagerecov_distrib(p, byref(c_int(p.shape[0])),
- S, byref(c_int(Ngrid)), q)
- return q.T
-
-def adjust_attachments(K, losstodate, factor):
- """
- computes the attachments adjusted for losses
- on current notional
- """
- return np.minimum(np.maximum((K-losstodate)/factor, 0), 1)
-
-def trancheloss(L, K1, K2):
- return np.maximum(L - K1, 0) - np.maximum(L - K2, 0)
-
-def trancherecov(R, K1, K2):
- return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0)
-
-def tranche_cl(L, R, cs, K1, K2, scaled = False):
- if(K1 == K2):
- return 0
- else:
- support = np.linspace(0, 1, L.shape[0])
- size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \
- np.dot(trancherecov(support, K1, K2), R)
- sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1])))
- if scaled:
- return 1/(K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"])
- else:
- return np.dot(sizeadj * cs["coupons"], cs["df"])
-
-def tranche_pl(L, cs, K1, K2, scaled=False):
- if(K1 == K2):
- return 0
- else:
- support = np.linspace(0, 1, L.shape[0])
- cf = K2 - K1 - np.dot(trancheloss(support, K1, K2), L)
- cf = np.hstack((K2-K1, cf))
- if scaled:
- return 1/(K2-K1) * np.dot(np.diff(cf), cs["df"])
- else:
- return np.dot(np.diff(cf), cs["df"])
-
-def tranche_pv(L, R, cs, K1, K2):
- return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2)
-
-def credit_schedule(tradedate, tenor, coupon, yc, enddate=None):
- tradedate = pydate_to_qldate(tradedate)
- if enddate is None:
- enddate = tradedate + Period(tenor)
- else:
- enddate = pydate_to_qldate(enddate)
- cal = WeekendsOnly()
- DC = Actual360()
- sched = Schedule.from_rule(tradedate, enddate, Period('3M'), cal,
- ModifiedFollowing, Unadjusted, CDS2015)
- dates = sched.to_npdates()
- pydates = dates.astype('O')
- df = [yc.discount_factor(d) for d in pydates if d > tradedate]
- coupons = [DC.year_fraction(d1, d2) * coupon for d1, d2 in zip(sched[:-2], sched[1:-1])]
- coupons.append(Actual360(True).year_fraction(sched[-2], sched[-1]) * coupon)
-
- return pydates[0], pd.DataFrame({"df": df, "coupons": coupons}, dates[1:])
-
-def cdsAccrued(tradedate, coupon):
- tradedate = pydate_to_qldate(tradedate)
- end = tradedate + Period('3Mo')
-
- start_protection = tradedate + 1
- DC = Actual360()
- cal = UnitedStates()
- sched = Schedule(start, end, Period('3Mo'), cal, date_generation_rule=CDS2015)
- prevpaydate = sched.previous_date(tradedate)
- return DC.year_fraction(prevpaydate, start_protection) * coupon
-
-def dist_transform(q):
- """computes the joint (D, R) distribution
- from the (L, R) distribution using D = L+R
- """
- Ngrid = q.shape[0]
- distDR = np.zeros_like(q)
- for i in range(Ngrid):
- for j in range(Ngrid):
- index = i + j
- if index < Ngrid:
- distDR[index,j] += q[i,j]
- else:
- distDR[Ngrid-1,j] += q[i,j]
- return distDR
-
-def dist_transform2(q):
- """computes the joint (D, R/D) distribution
- from the (D, R) distribution
- """
- Ngrid = q.shape[0]
- distDR = np.empty(Ngrid, dtype='object')
- for i in range(Ngrid):
- distDR[i] = {}
- for i in range(1, Ngrid):
- for j in range(i+1):
- index = (j / i)
- distDR[i][index] = distDR[i].get(index, 0) + q[i,j]
- return distDR
-
-def compute_pv(q, strike):
- """ compute E(1_{R^\bar \leq strike} * D)"""
- D = 0
- for i in range(q.shape):
- val += sum(v for k, v in q[i].items() if k < strike) * 1/Ngrid
- return val
-
-def average_recov(p, R, Ngrid):
- q = np.zeros((p.shape[0]+1, Ngrid))
- q[0,0] = 1
- lu = 1 / (Ngrid-1)
- weights = np.empty(Ngrid)
- index = np.empty(Ngrid)
- grid = np.linspace(0, 1, Ngrid)
- for i, prob in enumerate(p):
- for j in range(i+1, 0, -1):
- newrecov = ((j-1) * grid + R[i])/j
- np.modf(newrecov * ( Ngrid - 1), weights, index)
- q[j] *= (1-prob)
- for k in range(Ngrid):
- q[j,int(index[k])+1] += weights[k] * prob * q[j-1, k]
- q[j,int(index[k])] += (1-weights[k]) * prob * q[j-1, k]
- q[0] *= (1-prob)
- return q
-
-if __name__=="__main__":
- # n_issuers = 100
- # p = np.random.rand(n_issuers)
- # pp = np.random.rand(n_issuers)
- # w = 1/n_issuers * np.ones(n_issuers)
- # S = np.random.rand(n_issuers)
- # rho = 0.5
- # pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
- # poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)
- import numpy as np
- n_issuers = 100
- p = np.random.rand(n_issuers)
- R = np.random.rand(n_issuers)
- Rbar = joint_default_averagerecov_distrib(p, 1-R, 1001)
- Rbar_slow = average_recov(p, R, 1001)