aboutsummaryrefslogtreecommitdiffstats
path: root/python/risk/indices.py
blob: a8181ab73588716e012007ba9dcd5d0af31affcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import datetime
import pandas as pd
from analytics import Portfolio, CreditIndex
from analytics.curve_trades import on_the_run
from analytics.index_data import index_returns
from math import sqrt
from psycopg2.extensions import connection
from typing import Tuple, Union


def get_index_portfolio(
    d: datetime.date,
    conn: connection,
    strategies: Union[Tuple[str], None] = None,
    exclude_redcode=[],
    **kwargs
):
    sql_str = (
        "SELECT security_id AS redcode, sum(notional) AS notional, maturity "
        "FROM list_cds_positions_by_strat(%s) "
    )
    params = (d,)
    if strategies is not None:
        if isinstance(strategies, tuple):
            sql_str += "WHERE folder in %s"
        else:
            sql_str += "WHERE folder = %s"
        params += (strategies,)
    sql_str += "GROUP BY security_id, maturity"
    with conn.cursor() as c:
        c.execute(sql_str, params)
        trades = [
            CreditIndex(
                redcode=rec.redcode,
                maturity=rec.maturity,
                notional=rec.notional,
                value_date=d,
            )
            for rec in c
            if rec.redcode not in exclude_redcode
        ]
    portf = Portfolio(trades)
    portf.mark()
    return portf


def VaR(portf: Portfolio, quantile=0.05, years: int = 5, period="monthly"):
    index_types = tuple(set(t.index_type for t in portf))
    df = index_returns(
        index=index_types,
        years=years,
        end_date=portf.value_date,
        tenor=["3yr", "5yr", "7yr", "10yr"],
    )
    df = df.reorder_levels(["date", "index", "series", "tenor"])
    returns = df.spread_return.dropna().reset_index("series")
    returns["dist_on_the_run"] = returns.groupby(["date", "index"])["series"].transform(
        lambda x: x.max() - x
    )
    del returns["series"]
    returns = returns.set_index("dist_on_the_run", append=True).unstack("tenor")
    returns.columns = returns.columns.droplevel(0)

    portf.reset_pv()

    spreads = pd.DataFrame(
        {
            "spread": portf.spread,
            "tenor": [ind.tenor for ind in portf.indices],
            "index": [ind.index_type for ind in portf.indices],
            "dist_on_the_run": [
                on_the_run(ind.index_type, portf.value_date) - ind.series
                for ind in portf.indices
            ],
        }
    )
    spreads = spreads.set_index(["index", "dist_on_the_run", "tenor"])
    r = []
    for k, g in returns.groupby(level="date", as_index=False):
        shocks = g.reset_index("date", drop=True).stack(["tenor"])
        shocks.name = "shocks"
        portf.spread = spreads.spread * (1 + spreads.join(shocks).shocks).values
        r.append((k, portf.pnl))
    pnl = pd.DataFrame.from_records(r, columns=["date", "pnl"], index=["date"])
    if period == "daily":
        return float(pnl.quantile(quantile))
    elif period == "monthly":
        return float(pnl.quantile(quantile)) * sqrt(20)
    else:
        raise ValueError("period needs to be either 'daily' or 'monthly'")


def insert_curve_risk(
    d: datetime.date, conn: connection, strategies: Tuple[str] = ("SER_IGCURVE",)
):
    sql_str = (
        "INSERT INTO curve_risk VALUES(%s, %s, %s, %s) "
        "ON CONFLICT (date, strategy) DO UPDATE SET "
        '"VaR"=excluded."VaR", currency=excluded.currency'
    )
    # add a portfolio with all strategies
    strategies = (*strategies, strategies)
    with conn.cursor() as c:
        for strat in strategies:
            portf = get_index_portfolio(d, conn, strat, exclude_redcode=["2I65BYDU6"])
            if portf:
                var = VaR(portf, period="daily")
                strat_name = "*" if isinstance(strat, tuple) else strat
                c.execute(sql_str, (d, strat_name, var, "USD"))
    conn.commit()