diff options
Diffstat (limited to 'python/analytics/utils.py')
| -rw-r--r-- | python/analytics/utils.py | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/python/analytics/utils.py b/python/analytics/utils.py deleted file mode 100644 index 8c01d2ae..00000000 --- a/python/analytics/utils.py +++ /dev/null @@ -1,270 +0,0 @@ -import analytics -import datetime -import numpy as np -import pandas as pd -from . import dbconn -from .exceptions import MissingDataError -from scipy.special import h_roots -from dateutil.relativedelta import relativedelta, WE -from contextlib import contextmanager -from functools import partial, wraps, lru_cache -from pyisda.date import pydate_to_TDate -from pandas.api.types import CategoricalDtype -from pandas.tseries.offsets import CustomBusinessDay -from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday -from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session -from quantlib.time.date import nth_weekday, Wednesday, Date - -fed_cal = get_calendar("USFederalHolidayCalendar") -bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday) -bus_day = CustomBusinessDay(calendar=bond_cal()) - - -tenor_t = CategoricalDtype( - [ - "1m", - "3m", - "6m", - "1yr", - "2yr", - "3yr", - "4yr", - "5yr", - "7yr", - "10yr", - "15yr", - "20yr", - "25yr", - "30yr", - ], - ordered=True, -) - - -def GHquad(n): - """Gauss-Hermite quadrature weights""" - Z, w = h_roots(n) - return Z * np.sqrt(2), w / np.sqrt(np.pi) - - -def next_twentieth(d): - r = d + relativedelta(day=20) - if r < d: - r += relativedelta(months=1) - mod = r.month % 3 - if mod != 0: - r += relativedelta(months=3 - mod) - return r - - -def third_wednesday(d): - if isinstance(d, datetime.date): - return d + relativedelta(day=1, weekday=WE(3)) - elif isinstance(d, Date): - return nth_weekday(3, Wednesday, d.month, d.year) - - -def next_third_wed(d): - y = third_wednesday(d) - if y < d: - return third_wednesday(d + relativedelta(months=1)) - else: - return y - - -def prev_business_day(d: datetime.date): - if (offset := d.weekday() - 4) > 0: - return d - datetime.timedelta(days=offset) - elif offset == -4: - return d - datetime.timedelta(days=3) - else: - return d - datetime.timedelta(days=1) - - -def adjust_prev_business_day(d: datetime.date): - """ roll to the previous business day""" - if (offset := d.weekday() - 4) > 0: - return d - datetime.timedelta(days=offset) - else: - return d - - -def adjust_next_business_day(d: datetime.date): - if (offset := 7 - d.weekday()) >= 3: - return d - else: - return d + datetime.timedelta(days=offset) - - -def next_business_day(d: datetime.date): - if (offset := 7 - d.weekday()) > 3: - return d + datetime.timedelta(days=1) - else: - return d + datetime.timedelta(days=offset) - - -def tenor_to_float(t: str): - if t == "6m": - return 0.5 - else: - return float(t.rstrip("yr")) - - -def roll_date(d, tenor, nd_array=False): - """ roll date d to the next CDS maturity""" - cutoff = pd.Timestamp("2015-09-20") - - def kwargs(t): - if abs(t) == 0.5: - return {"months": int(12 * t)} - else: - return {"years": int(t)} - - if not isinstance(d, pd.Timestamp): - cutoff = cutoff.date() - if d <= cutoff: - if isinstance(tenor, (int, float)): - d_rolled = d + relativedelta(**kwargs(tenor), days=1) - return next_twentieth(d_rolled) - elif hasattr(tenor, "__iter__"): - v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor] - if nd_array: - return np.array([pydate_to_TDate(d) for d in v]) - else: - return v - else: - raise TypeError("tenor is not a number nor an iterable") - else: # semi-annual rolling starting 2015-12-20 - if isinstance(tenor, (int, float)): - d_rolled = d + relativedelta(**kwargs(tenor)) - elif hasattr(tenor, "__iter__"): - d_rolled = d + relativedelta(years=1) - else: - raise TypeError("tenor is not a number nor an iterable") - - if (d >= d + relativedelta(month=9, day=20)) or ( - d < d + relativedelta(month=3, day=20) - ): - d_rolled += relativedelta(month=12, day=20) - if d.month <= 3: - d_rolled -= relativedelta(years=1) - else: - d_rolled += relativedelta(month=6, day=20) - if isinstance(tenor, (int, float)): - return d_rolled - else: - v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor] - if nd_array: - return np.array([pydate_to_TDate(d) for d in v]) - else: - return v - - -def build_table(rows, format_strings, row_format): - def apply_format(row, format_string): - for r, f in zip(row, format_string): - if f is None: - yield r - else: - if callable(f): - yield f(r) - elif isinstance(f, str): - if isinstance(r, tuple): - yield f.format(*r) - else: - yield f.format(r) - - return [ - row_format.format(*apply_format(row, format_string)) - for row, format_string in zip(rows, format_strings) - ] - - -def memoize(f=None, *, hasher=lambda args: (hash(args),)): - if f is None: - return partial(memoize, hasher=hasher) - - @wraps(f) - def cached_f(*args, **kwargs): - self = args[0] - key = (f.__name__, *hasher(args)) - cache = getattr(self, f"_{type(self).__name__}__cache") - if key in cache: - return cache[key] - else: - v = f(*args, **kwargs) - cache[key] = v - return v - - return cached_f - - -def to_TDate(arr: np.ndarray): - """ convert an array of numpy datetime to TDate""" - return arr.view("int") + 134774 - - -def get_external_nav(engine, trade_id, value_date=None, trade_type="swaptions"): - if trade_type == "swaptions": - upfront_query = ( - "CASE when date < settle_date " - "THEN price * notional/100 * (2 * buysell::integer - 1) " - "ELSE 0." - "END" - ) - elif trade_type == "cds": - upfront_query = "CASE WHEN date < upfront_settle_date THEN upfront ELSE 0. END" - query = ( - "SELECT date, " - "base_nav, " - f"({upfront_query}) AS upfront FROM external_marks_deriv " - f"LEFT JOIN {trade_type} " - "ON cpty_id = identifier WHERE id=%s " - ) - - if value_date: - query += "AND date=%s" - r = engine.execute(query, (trade_id, value_date)) - try: - date, nav, upfront = next(r) - except StopIteration: - raise MissingDataError( - f"No quote available for {trade_type} {trade_id} on {value_date}" - ) - return nav + upfront - else: - query += "ORDER BY DATE" - return pd.read_sql_query( - query, engine, params=(trade_id,), parse_dates=["date"], index_col=["date"] - ) - - -@lru_cache(32) -def get_fx(value_date: datetime.date, currency: str): - if currency == "USD": - return 1.0 - if value_date == datetime.date.today(): - with init_bbg_session(BBG_IP) as session: - security = currency.upper() + "USD Curncy" - field = "PX_LAST" - ref_data = retrieve_data(session, [security], field) - return ref_data[security][field] - conn = dbconn("dawndb") - with conn.cursor() as c: - c.execute("SELECT * FROM fx where date=%s", (value_date,)) - rec = c.fetchone() - r = getattr(rec, currency.lower() + "usd", None) - if r is None: - raise MissingDataError( - f"No {currency.upper()}USD fx rate available for {value_date}" - ) - conn.close() - return r - - -@contextmanager -def run_local(local=True): - saved_local = analytics._local - analytics._local = local - yield - analytics._local = saved_local |
