import datetime import numpy as np import pandas as pd from scipy.special import h_roots from dateutil.relativedelta import relativedelta, WE from functools import partial, wraps from pyisda.date import pydate_to_TDate from pandas.api.types import CategoricalDtype from pandas.tseries.offsets import CustomBusinessDay, Day, QuarterBegin from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday fed_cal = get_calendar("USFederalHolidayCalendar") bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday) bus_day = CustomBusinessDay(calendar=bond_cal()) from quantlib.time.date import nth_weekday, Wednesday, Date tenor_t = CategoricalDtype( [ "1m", "3m", "6m", "1yr", "2yr", "3yr", "4yr", "5yr", "7yr", "10yr", "15yr", "20yr", "25yr", "30yr", ], ordered=True, ) def GHquad(n): """Gauss-Hermite quadrature weights""" Z, w = h_roots(n) return Z * np.sqrt(2), w / np.sqrt(np.pi) def next_twentieth(d): r = d + relativedelta(day=20) if r < d: r += relativedelta(months=1) mod = r.month % 3 if mod != 0: r += relativedelta(months=3 - mod) return r def third_wednesday(d): if isinstance(d, datetime.date): return d + relativedelta(day=1, weekday=WE(3)) elif isinstance(d, Date): return nth_weekday(3, Wednesday, d.month, d.year) def next_third_wed(d): y = third_wednesday(d) if y < d: return third_wednesday(d + relativedelta(months=1)) else: return y def roll_date(d, tenor, nd_array=False): """ roll date d to the next CDS maturity""" cutoff = pd.Timestamp("2015-09-20") def kwargs(t): if abs(t) == 0.5: return {"months": int(12 * t)} else: return {"years": int(t)} if not isinstance(d, pd.Timestamp): cutoff = cutoff.date() if d <= cutoff: if isinstance(tenor, (int, float)): d_rolled = d + relativedelta(**kwargs(tenor), days=1) return next_twentieth(d_rolled) elif hasattr(tenor, "__iter__"): v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor] if nd_array: return np.array([pydate_to_TDate(d) for d in v]) else: return v else: raise TypeError("tenor is not a number nor an iterable") else: # semi-annual rolling starting 2015-12-20 if isinstance(tenor, (int, float)): d_rolled = d + relativedelta(**kwargs(tenor)) elif hasattr(tenor, "__iter__"): d_rolled = d + relativedelta(years=1) else: raise TypeError("tenor is not a number nor an iterable") if (d >= d + relativedelta(month=9, day=20)) or ( d < d + relativedelta(month=3, day=20) ): d_rolled += relativedelta(month=12, day=20) if d.month <= 3: d_rolled -= relativedelta(years=1) else: d_rolled += relativedelta(month=6, day=20) if isinstance(tenor, (int, float)): return d_rolled else: v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor] if nd_array: return np.array([pydate_to_TDate(d) for d in v]) else: return v def build_table(rows, format_strings, row_format): def apply_format(row, format_string): for r, f in zip(row, format_string): if f is None: yield r else: if callable(f): yield f(r) elif isinstance(f, str): if isinstance(r, tuple): yield f.format(*r) else: yield f.format(r) return [ row_format.format(*apply_format(row, format_string)) for row, format_string in zip(rows, format_strings) ] def memoize(f=None, *, hasher=lambda args: (hash(args),)): if f is None: return partial(memoize, hasher=hasher) @wraps(f) def cached_f(*args, **kwargs): self = args[0] key = (f.__name__, *hasher(args)) if key in self._cache: return self._cache[key] else: v = f(*args, **kwargs) self._cache[key] = v return v return cached_f