import numpy as np from ctypes import * from quantlib.time.schedule import Schedule, CDS from quantlib.time.api import Actual360, Period, UnitedStates, Following, today from quantlib.util.converter import qldate_to_pydate, pydate_to_qldate import pandas as pd libloss = np.ctypeslib.load_library("lossdistrib", "/home/share/CorpCDOs/code/R") libloss.fitprob.restype = None libloss.fitprob.argtypes = [np.ctypeslib.ndpointer('double', ndim=1, flags='F'), np.ctypeslib.ndpointer('double', ndim=1, flags='F'), POINTER(c_int), POINTER(c_double), POINTER(c_double), np.ctypeslib.ndpointer('double', ndim=1, flags='F,writeable')] libloss.stochasticrecov.restype = None libloss.stochasticrecov.argtypes = [POINTER(c_double), POINTER(c_double), np.ctypeslib.ndpointer('double', ndim=2, flags='F'), np.ctypeslib.ndpointer('double', ndim=2, flags='F'), POINTER(c_int), POINTER(c_double), POINTER(c_double), POINTER(c_double), np.ctypeslib.ndpointer('double', ndim=1, flags='F,writeable')] libloss.BClossdist.restype = None libloss.BClossdist.argtypes = [np.ctypeslib.ndpointer('double', ndim=2, flags='F'),# defaultprob POINTER(c_int),# nrow(defaultprob) POINTER(c_int),# ncol(defaultprob) np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# issuerweights np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# recovery np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# Z np.ctypeslib.ndpointer('double', ndim=1, flags='F'),# w POINTER(c_int), # len(Z) = len(w) np.ctypeslib.ndpointer('double', ndim=1, flags='F'), # rho POINTER(c_int), # Ngrid POINTER(c_int), #defaultflag np.ctypeslib.ndpointer('double', ndim=2, flags='F,writeable'),# output L np.ctypeslib.ndpointer('double', ndim=2, flags='F,writeable')# output R ] libgq = np.ctypeslib.load_library("GHquad", ".") libgq.GHquad.restype = None libgq.GHquad.argtypes = [c_int, np.ctypeslib.ndpointer('double', ndim=1, flags='F'), np.ctypeslib.ndpointer('double', ndim=1, flags='F')] def GHquad(n): Z = np.zeros(n, dtype='double') w = np.zeros(n, dtype='double') libgq.GHquad(n, Z, w) return Z, w def stochasticrecov(R, Rtilde, Z, w, rho, porig, pmod): q = np.zeros_like(Z) libloss.stochasticrecov(byref(c_double(R)), byref(c_double(Rtilde)), Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(porig)), byref(c_double(pmod)), q) return q def fitprob(Z, w, rho, p0): result = np.empty_like(Z) libloss.fitprob(Z, w, byref(c_int(Z.size)), byref(c_double(rho)), byref(c_double(p0)), result) return result def BClossdist(defaultprob, issuerweights, recov, rho, Z, w, Ngrid = 101, defaultflag = False): L = np.zeros((Ngrid, defaultprob.shape[1]), order='F') R = np.zeros_like(L) rho = np.repeat(rho, issuerweights.size) libloss.BClossdist(defaultprob, byref(c_int(defaultprob.shape[0])), byref(c_int(defaultprob.shape[1])), issuerweights, recov, Z, w, byref(c_int(Z.size)), rho, byref(c_int(Ngrid)), byref(c_int(defaultflag)), L, R) return L, R def adjust_attachments(K, losstodate, factor): """ computes the attachments adjusted for losses on current notional """ return np.minimum(np.maximum((K-losstodate)/factor, 0), 1) def trancheloss(L, K1, K2): return np.maximum(L - K1, 0) - np.maximum(L - K2, 0) def trancherecov(R, K1, K2): return np.maximum(R - 1 + K2, 0) - np.maximum(R - 1 + K1, 0) def tranche_cl(L, R, cs, K1, K2, scaled = False): if(K1 == K2): return 0 else: support = np.linspace(0, 1, L.shape[0]) size = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) - \ np.dot(trancherecov(support, K1, K2), R) sizeadj = 0.5 * (size + np.hstack((K2-K1, size[:-1]))) if scaled: return 1/(K2-K1) * np.dot(sizeadj * cs["coupons"], cs["df"]) else: return np.dot(sizeadj * cs["coupons"], cs["df"]) def tranche_pl(L, cs, K1, K2, scaled=False): if(K1 == K2): return 0 else: support = np.linspace(0, 1, L.shape[0]) cf = K2 - K1 - np.dot(trancheloss(support, K1, K2), L) cf = np.hstack((K2-K1, cf)) if scaled: return 1/(K2-K1) * np.dot(np.diff(cf), cs["df"]) else: return np.dot(np.diff(cf), cs["df"]) def tranche_pv(L, R, cs, K1, K2): return tranche_pl(L, cs, K1, K2) + tranche_cl(L, R, cs, K2, K2) def creditSchedule(tradedate, tenor, coupon, yc, enddate = None): tradedate = pydate_to_qldate(tradedate) start = tradedate - Period('4Mo') enddate = pydate_to_qldate(enddate) if enddate is None: enddate = tradedate + Period(tenor) cal = UnitedStates() DC = Actual360() sched = Schedule(start, enddate, Period('3Mo'), cal, date_generation_rule=CDS) prevpaydate = sched.previous_date(tradedate) sched = [d for d in sched if d>=prevpaydate] df = [yc.discount(d) for d in sched if d > tradedate] dates = pd.to_datetime([str(d) for d in sched if d > tradedate], "%m/%d/%Y") coupons = np.diff([DC.year_fraction(prevpaydate, d) * coupon for d in sched]) return pd.DataFrame({"df":df, "coupons":coupons}, dates) def cdsAccrued(tradedate, coupon): tradedate = pydate_to_qldate(tradedate) start = tradedate - Period('3Mo') end = tradedate + Period('3Mo') start_protection = tradedate + 1 DC = Actual360() cal = UnitedStates() sched = Schedule(start, end, Period('3Mo'), cal, date_generation_rule=CDS) prevpaydate = sched.previous_date(tradedate) return DC.year_fraction(prevpaydate, start_protection) * coupon