aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/utils.py')
-rw-r--r--python/analytics/utils.py270
1 files changed, 0 insertions, 270 deletions
diff --git a/python/analytics/utils.py b/python/analytics/utils.py
deleted file mode 100644
index 8c01d2ae..00000000
--- a/python/analytics/utils.py
+++ /dev/null
@@ -1,270 +0,0 @@
-import analytics
-import datetime
-import numpy as np
-import pandas as pd
-from . import dbconn
-from .exceptions import MissingDataError
-from scipy.special import h_roots
-from dateutil.relativedelta import relativedelta, WE
-from contextlib import contextmanager
-from functools import partial, wraps, lru_cache
-from pyisda.date import pydate_to_TDate
-from pandas.api.types import CategoricalDtype
-from pandas.tseries.offsets import CustomBusinessDay
-from pandas.tseries.holiday import get_calendar, HolidayCalendarFactory, GoodFriday
-from bbg_helpers import BBG_IP, retrieve_data, init_bbg_session
-from quantlib.time.date import nth_weekday, Wednesday, Date
-
-fed_cal = get_calendar("USFederalHolidayCalendar")
-bond_cal = HolidayCalendarFactory("BondCalendar", fed_cal, GoodFriday)
-bus_day = CustomBusinessDay(calendar=bond_cal())
-
-
-tenor_t = CategoricalDtype(
- [
- "1m",
- "3m",
- "6m",
- "1yr",
- "2yr",
- "3yr",
- "4yr",
- "5yr",
- "7yr",
- "10yr",
- "15yr",
- "20yr",
- "25yr",
- "30yr",
- ],
- ordered=True,
-)
-
-
-def GHquad(n):
- """Gauss-Hermite quadrature weights"""
- Z, w = h_roots(n)
- return Z * np.sqrt(2), w / np.sqrt(np.pi)
-
-
-def next_twentieth(d):
- r = d + relativedelta(day=20)
- if r < d:
- r += relativedelta(months=1)
- mod = r.month % 3
- if mod != 0:
- r += relativedelta(months=3 - mod)
- return r
-
-
-def third_wednesday(d):
- if isinstance(d, datetime.date):
- return d + relativedelta(day=1, weekday=WE(3))
- elif isinstance(d, Date):
- return nth_weekday(3, Wednesday, d.month, d.year)
-
-
-def next_third_wed(d):
- y = third_wednesday(d)
- if y < d:
- return third_wednesday(d + relativedelta(months=1))
- else:
- return y
-
-
-def prev_business_day(d: datetime.date):
- if (offset := d.weekday() - 4) > 0:
- return d - datetime.timedelta(days=offset)
- elif offset == -4:
- return d - datetime.timedelta(days=3)
- else:
- return d - datetime.timedelta(days=1)
-
-
-def adjust_prev_business_day(d: datetime.date):
- """ roll to the previous business day"""
- if (offset := d.weekday() - 4) > 0:
- return d - datetime.timedelta(days=offset)
- else:
- return d
-
-
-def adjust_next_business_day(d: datetime.date):
- if (offset := 7 - d.weekday()) >= 3:
- return d
- else:
- return d + datetime.timedelta(days=offset)
-
-
-def next_business_day(d: datetime.date):
- if (offset := 7 - d.weekday()) > 3:
- return d + datetime.timedelta(days=1)
- else:
- return d + datetime.timedelta(days=offset)
-
-
-def tenor_to_float(t: str):
- if t == "6m":
- return 0.5
- else:
- return float(t.rstrip("yr"))
-
-
-def roll_date(d, tenor, nd_array=False):
- """ roll date d to the next CDS maturity"""
- cutoff = pd.Timestamp("2015-09-20")
-
- def kwargs(t):
- if abs(t) == 0.5:
- return {"months": int(12 * t)}
- else:
- return {"years": int(t)}
-
- if not isinstance(d, pd.Timestamp):
- cutoff = cutoff.date()
- if d <= cutoff:
- if isinstance(tenor, (int, float)):
- d_rolled = d + relativedelta(**kwargs(tenor), days=1)
- return next_twentieth(d_rolled)
- elif hasattr(tenor, "__iter__"):
- v = [next_twentieth(d + relativedelta(**kwargs(t), days=1)) for t in tenor]
- if nd_array:
- return np.array([pydate_to_TDate(d) for d in v])
- else:
- return v
- else:
- raise TypeError("tenor is not a number nor an iterable")
- else: # semi-annual rolling starting 2015-12-20
- if isinstance(tenor, (int, float)):
- d_rolled = d + relativedelta(**kwargs(tenor))
- elif hasattr(tenor, "__iter__"):
- d_rolled = d + relativedelta(years=1)
- else:
- raise TypeError("tenor is not a number nor an iterable")
-
- if (d >= d + relativedelta(month=9, day=20)) or (
- d < d + relativedelta(month=3, day=20)
- ):
- d_rolled += relativedelta(month=12, day=20)
- if d.month <= 3:
- d_rolled -= relativedelta(years=1)
- else:
- d_rolled += relativedelta(month=6, day=20)
- if isinstance(tenor, (int, float)):
- return d_rolled
- else:
- v = [d_rolled + relativedelta(**kwargs(t - 1)) for t in tenor]
- if nd_array:
- return np.array([pydate_to_TDate(d) for d in v])
- else:
- return v
-
-
-def build_table(rows, format_strings, row_format):
- def apply_format(row, format_string):
- for r, f in zip(row, format_string):
- if f is None:
- yield r
- else:
- if callable(f):
- yield f(r)
- elif isinstance(f, str):
- if isinstance(r, tuple):
- yield f.format(*r)
- else:
- yield f.format(r)
-
- return [
- row_format.format(*apply_format(row, format_string))
- for row, format_string in zip(rows, format_strings)
- ]
-
-
-def memoize(f=None, *, hasher=lambda args: (hash(args),)):
- if f is None:
- return partial(memoize, hasher=hasher)
-
- @wraps(f)
- def cached_f(*args, **kwargs):
- self = args[0]
- key = (f.__name__, *hasher(args))
- cache = getattr(self, f"_{type(self).__name__}__cache")
- if key in cache:
- return cache[key]
- else:
- v = f(*args, **kwargs)
- cache[key] = v
- return v
-
- return cached_f
-
-
-def to_TDate(arr: np.ndarray):
- """ convert an array of numpy datetime to TDate"""
- return arr.view("int") + 134774
-
-
-def get_external_nav(engine, trade_id, value_date=None, trade_type="swaptions"):
- if trade_type == "swaptions":
- upfront_query = (
- "CASE when date < settle_date "
- "THEN price * notional/100 * (2 * buysell::integer - 1) "
- "ELSE 0."
- "END"
- )
- elif trade_type == "cds":
- upfront_query = "CASE WHEN date < upfront_settle_date THEN upfront ELSE 0. END"
- query = (
- "SELECT date, "
- "base_nav, "
- f"({upfront_query}) AS upfront FROM external_marks_deriv "
- f"LEFT JOIN {trade_type} "
- "ON cpty_id = identifier WHERE id=%s "
- )
-
- if value_date:
- query += "AND date=%s"
- r = engine.execute(query, (trade_id, value_date))
- try:
- date, nav, upfront = next(r)
- except StopIteration:
- raise MissingDataError(
- f"No quote available for {trade_type} {trade_id} on {value_date}"
- )
- return nav + upfront
- else:
- query += "ORDER BY DATE"
- return pd.read_sql_query(
- query, engine, params=(trade_id,), parse_dates=["date"], index_col=["date"]
- )
-
-
-@lru_cache(32)
-def get_fx(value_date: datetime.date, currency: str):
- if currency == "USD":
- return 1.0
- if value_date == datetime.date.today():
- with init_bbg_session(BBG_IP) as session:
- security = currency.upper() + "USD Curncy"
- field = "PX_LAST"
- ref_data = retrieve_data(session, [security], field)
- return ref_data[security][field]
- conn = dbconn("dawndb")
- with conn.cursor() as c:
- c.execute("SELECT * FROM fx where date=%s", (value_date,))
- rec = c.fetchone()
- r = getattr(rec, currency.lower() + "usd", None)
- if r is None:
- raise MissingDataError(
- f"No {currency.upper()}USD fx rate available for {value_date}"
- )
- conn.close()
- return r
-
-
-@contextmanager
-def run_local(local=True):
- saved_local = analytics._local
- analytics._local = local
- yield
- analytics._local = saved_local