aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/utils.py
blob: 51061e794b5cf541462417491205d3336bdf0e9b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import numpy as np
import pandas as pd
from scipy.special import h_roots
from dateutil.relativedelta import relativedelta, FR
import datetime
from pyisda.date import pydate_to_TDate
from pandas.api.types import CategoricalDtype
from quantlib.time.date import nth_weekday, Wednesday, Date

tenor_t = CategoricalDtype(['1m', '3m', '6m', '1yr', '2yr', '3yr', '4yr',
                            '5yr', '7yr', '10yr', '15yr', '20yr', '25yr',
                            '30yr'],
                           ordered=True)


def GHquad(n):
    """Gauss-Hermite quadrature weights"""
    Z, w = h_roots(n)
    return Z*np.sqrt(2), w/np.sqrt(np.pi)


def next_twentieth(d):
    r = d + relativedelta(day=20)
    if(r < d):
        r += relativedelta(months=1)
    mod = r.month % 3
    if mod != 0:
        r += relativedelta(months=3 - mod)
    return r


def third_wednesday(d):
    if isinstance(d, datetime.date):
        return d + relativedelta(day=1, weekday=FR(3))
    elif isinstance(d, Date):
        return nth_weekday(3, Wednesday, d.month, d.year)


def next_third_wed(d):
    y = third_wednesday(d)
    if y < d:
        return third_wednesday(d + relativedelta(months=1))
    else:
        return y


def roll_date(d, tenor, nd_array=False):
    """ roll date d to the next CDS maturity"""
    cutoff = pd.Timestamp('2015-09-20')

    def kwargs(t):
        if abs(t) == 0.5:
            return {'months': int(12 * t)}
        else:
            return {'years': int(t)}

    if not isinstance(d, pd.Timestamp):
        cutoff = cutoff.date()
    if d <= cutoff:
        if isinstance(tenor, (int, float)):
            d_rolled = d + relativedelta(**kwargs(tenor), days=1)
            return next_twentieth(d_rolled)
        elif hasattr(tenor, '__iter__'):
            v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor]
            if nd_array:
                return np.array([pydate_to_TDate(d) for d in v])
            else:
                return v
        else:
            raise TypeError('tenor is not a number nor an iterable')
    else:  # semi-annual rolling starting 2015-12-20
        if isinstance(tenor, (int, float)):
            d_rolled = d + relativedelta(**kwargs(tenor))
        elif hasattr(tenor, '__iter__'):
            d_rolled = d + relativedelta(years=1)
        else:
            raise TypeError('tenor is not a number nor an iterable')

        if((d >= d + relativedelta(month=9, day=20)) or
           (d < d + relativedelta(month=3, day=20))):
            d_rolled += relativedelta(month=12, day=20)
            if d.month <= 3:
                d_rolled -= relativedelta(years=1)
        else:
            d_rolled += relativedelta(month=6, day=20)
        if isinstance(tenor, (int, float)):
            return d_rolled
        else:
            v = [d_rolled + relativedelta(**kwargs(t-1)) for t in tenor]
            if nd_array:
                return np.array([pydate_to_TDate(d) for d in v])
            else:
                return v


def build_table(rows, format_strings, row_format):
    r = []
    for row, format_string in zip(rows, format_strings):
        row = [f.format(r) if f else r for r, f in zip(row, format_string)]
        r.append(row_format.format(*row))
    return r