aboutsummaryrefslogtreecommitdiffstats
path: root/python/tranche_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tranche_functions.py')
-rw-r--r--python/tranche_functions.py218
1 files changed, 197 insertions, 21 deletions
diff --git a/python/tranche_functions.py b/python/tranche_functions.py
index f6c5ab35..1a3e04e5 100644
--- a/python/tranche_functions.py
+++ b/python/tranche_functions.py
@@ -1,5 +1,6 @@
import numpy as np
from ctypes import *
+from numpy.ctypeslib import ndpointer
from quantlib.time.schedule import Schedule, CDS
from quantlib.time.api import Actual360, Period, UnitedStates, Following, today
from quantlib.util.converter import qldate_to_pydate, pydate_to_qldate
@@ -8,48 +9,79 @@ from scipy.special import h_roots
from common import root
import os
-libloss = np.ctypeslib.load_library("lossdistrib", os.path.join(root, "code/R/lossdistrib/src"))
+def wrapped_ndpointer(*args, **kwargs):
+ base = ndpointer(*args, **kwargs)
+ def from_param(cls, obj):
+ if obj is None:
+ return obj
+ return base.from_param(obj)
+ return type(base.__name__, (base,), {'from_param': classmethod(from_param)})
+
+libloss = np.ctypeslib.load_library("lossdistrib", "../R/lossdistrib/src/")
libloss.fitprob.restype = None
libloss.fitprob.argtypes = [
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
- np.ctypeslib.ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer('double', ndim=1, flags='F,writeable')]
libloss.stochasticrecov.restype = None
libloss.stochasticrecov.argtypes = [
POINTER(c_double),
POINTER(c_double),
- np.ctypeslib.ndpointer('double', ndim=2, flags='F'),
- np.ctypeslib.ndpointer('double', ndim=2, flags='F'),
+ ndpointer('double', ndim=2, flags='F'),
+ ndpointer('double', ndim=2, flags='F'),
POINTER(c_int),
POINTER(c_double),
POINTER(c_double),
POINTER(c_double),
- np.ctypeslib.ndpointer('double', ndim=1, flags='F,writeable')]
+ ndpointer('double', ndim=1, flags='F,writeable')]
libloss.BCloss_recov_dist.restype = None
libloss.BCloss_recov_dist.argtypes = [
- np.ctypeslib.ndpointer('double', ndim=2, flags='F'),# defaultprob
+ ndpointer('double', ndim=2, flags='F'),# defaultprob
POINTER(c_int),# nrow(defaultprob)
POINTER(c_int),# ncol(defaultprob)
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# issuerweights
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# recovery
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# Z
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# w
+ ndpointer('double', ndim=1, flags='F'),# issuerweights
+ ndpointer('double', ndim=1, flags='F'),# recovery
+ ndpointer('double', ndim=1, flags='F'),# Z
+ ndpointer('double', ndim=1, flags='F'),# w
POINTER(c_int), # len(Z) = len(w)
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'), # rho
+ ndpointer('double', ndim=1, flags='F'), # rho
POINTER(c_int), # Ngrid
POINTER(c_int), #defaultflag
- np.ctypeslib.ndpointer('double', ndim=2, flags='F,writeable'),# output L
- np.ctypeslib.ndpointer('double', ndim=2, flags='F,writeable')# output R
+ ndpointer('double', ndim=2, flags='F,writeable'),# output L
+ ndpointer('double', ndim=2, flags='F,writeable')# output R
+]
+
+libloss.lossdistrib_joint.restype = None
+libloss.lossdistrib_joint.argtypes = [
+ ndpointer('double', ndim=1, flags='F'),
+ wrapped_ndpointer('double', ndim=1, flags='F'),
+ POINTER(c_int),
+ ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
+ POINTER(c_int),
+ POINTER(c_int),
+ ndpointer('double', ndim=2, flags='F,writeable')
+]
+
+libloss.lossdistrib_joint_Z.restype = None
+libloss.lossdistrib_joint_Z.argtypes = [
+ ndpointer('double', ndim=1, flags='F'),
+ wrapped_ndpointer('double', ndim=1, flags='F'),
+ POINTER(c_int),
+ ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
+ POINTER(c_int),
+ POINTER(c_int),
+ ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
+ ndpointer('double', ndim=1, flags='F'),
+ POINTER(c_int),
+ ndpointer('double', ndim=2, flags='F,writeable')
]
-libgq = np.ctypeslib.load_library("GHquad", os.path.join(root, "code", "python"))
-libgq.GHquad.restype = None
-libgq.GHquad.argtypes = [c_int,
- np.ctypeslib.ndpointer('double', ndim=1, flags='F'),
- np.ctypeslib.ndpointer('double', ndim=1, flags='F')]
def GHquad(n):
Z, w = h_roots(n)
return Z*np.sqrt(2), w/np.sqrt(np.pi)
@@ -65,7 +97,7 @@ def fitprob(Z, w, rho, p0):
libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result)
return result
-def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid = 101, defaultflag = False):
+def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid=101, defaultflag=False):
L = np.zeros((Ngrid, defaultprob.shape[1]), order='F')
R = np.zeros_like(L)
rho = np.repeat(rho, issuerweights.size)
@@ -75,6 +107,103 @@ def BCloss_recov_dist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid = 101,
byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R)
return L, R
+def lossdistrib_joint(p, pp, w, S, Ngrid=101, defaultflag=False):
+ """Joint loss-recovery distribution recursive algorithm.
+
+ This computes the joint loss/recovery distribution using a first order
+ correction.
+
+ Parameters
+ ----------
+ p : (N,) array_like
+ Vector of default probabilities.
+ pp : (N,) array_like or None
+ Vector of prepayments.
+ w : (N,) array_like
+ Issuer weights.
+ S : (N,) array_like
+ Vector of severities.
+ Ngrid : integer, optional
+ Number of ticks on the grid, default 101.
+ defaultflag : bool, optional
+ If True computes the default distribution instead.
+
+ Returns
+ -------
+ q : (N, N) ndarray
+
+ Notes
+ -----
+ np.sum(q, axis=0) is the recovery distribution marginal
+ np.sum(q, axis=1) is the loss (or default) distribution marginal
+ """
+
+ q = np.zeros((Ngrid, Ngrid), order='F')
+ if pp is not None:
+ assert(p.shape == pp.shape)
+ assert(w.shape == S.shape)
+ libloss.lossdistrib_joint(p, pp, byref(c_int(p.shape[0])),
+ w, S, byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)), q)
+ return q
+
+def lossdistrib_joint_Z(p, pp, w, S, rho, Ngrid=101, defaultflag=False, nZ=500):
+ """Joint loss-recovery distribution recursive algorithm.
+
+ This computes the joint loss/recovery distribution using a first order
+ correction.
+
+ Parameters
+ ----------
+ p : (N,) array_like
+ Vector of default probabilities.
+ pp : (N,) array_like or None
+ Vector of prepayments.
+ w : (N,) array_like
+ Issuer weights.
+ S : (N,) array_like
+ Vector of severities.
+ rho : float
+ Correlation.
+ Ngrid : integer, optional
+ Number of ticks on the grid, default 101.
+ defaultflag : bool, optional
+ If True computes the default distribution instead.
+ nZ : int, optional
+ Size of stochastic factor.
+
+ Returns
+ -------
+ q : (N, N) ndarray
+
+ Notes
+ -----
+ np.sum(q, axis=0) is the recovery distribution marginal
+ np.sum(q, axis=1) is the loss (or default) distribution marginal
+
+ Examples
+ --------
+ >>> import numpy as np
+ >>> p = np.random.rand(100)
+ >>> pp = np.zeros(100)
+ >>> w = 1/100 * np.ones(100)
+ >>> S = np.random.rand(100)
+ >>> q = lossdistrib_joint_Z(p, pp, S, 0.5)
+
+ """
+ Z, wZ = GHquad(nZ)
+ q = np.zeros((Ngrid, Ngrid), order='F')
+ rho = rho * np.ones(p.shape[0])
+ if pp is not None:
+ assert(p.shape == pp.shape)
+ assert(w.shape == S.shape)
+
+ libloss.lossdistrib_joint_Z(p, pp, byref(c_int(p.shape[0])),
+ w, S, byref(c_int(Ngrid)),
+ byref(c_int(defaultflag)), rho, Z, wZ,
+ byref(c_int(nZ)), q)
+ return q
+
def adjust_attachments(K, losstodate, factor):
"""
computes the attachments adjusted for losses
@@ -145,3 +274,50 @@ def cdsAccrued(tradedate, coupon):
sched = Schedule(start, end, Period('3Mo'), cal, date_generation_rule=CDS)
prevpaydate = sched.previous_date(tradedate)
return DC.year_fraction(prevpaydate, start_protection) * coupon
+
+def dist_transform(q):
+ """computes the joint (D, R) distribution
+ from the (L, R) distribution using D = L+R
+ """
+ Ngrid = q.shape[0]
+ distDR = np.zeros_like(q)
+ for i in range(Ngrid):
+ for j in range(Ngrid):
+ index = i + j
+ if index < Ngrid:
+ distDR[index,j] += q[i,j]
+ else:
+ distDR[Ngrid-1,j] += q[i,j]
+ return distDR
+
+def dist_transform2(q):
+ """computes the joint (D, R/D) distribution
+ from the (D, R) distribution
+ """
+ Ngrid = q.shape[0]
+ distDR = np.empty(Ngrid, dtype='object')
+ for i in range(Ngrid):
+ distDR[i] = {}
+ for i in range(1, Ngrid):
+ for j in range(i+1):
+ index = (j / i)
+ distDR[i][index] = distDR[i].get(index, 0) + q[i,j]
+ return distDR
+
+def compute_pv(q, strike):
+ """ compute E(1_{R^\bar \leq strike} * D)"""
+ D = 0
+ for i in range(q.shape):
+ val += sum(v for k, v in q[i].items() if k < strike) * 1/Ngrid
+ return val
+
+
+if __name__=="__main__":
+ n_issuers = 100
+ p = np.random.rand(n_issuers)
+ pp = np.random.rand(n_issuers)
+ w = 1/n_issuers * np.ones(n_issuers)
+ S = np.random.rand(n_issuers)
+ rho = 0.5
+ pomme = lossdistrib_joint_Z(p, None, w, S, rho, defaultflag=True)
+ poire = lossdistrib_joint_Z(p, pp, w, S, rho, defaultflag=True)