diff options
| -rw-r--r-- | example.py | 91 | ||||
| -rw-r--r-- | pyisda/utils.py | 153 | ||||
| -rw-r--r-- | setup.py | 41 | ||||
| -rw-r--r-- | tests/test_pickle.py | 43 |
4 files changed, 228 insertions, 100 deletions
@@ -11,46 +11,85 @@ if __name__ == "__main__": today_date = datetime.date(2016, 7, 12) yc = build_yc(today_date, True) step_in_date = today_date + datetime.timedelta(days=1) - value_date = datetime.date(2016, 7, 15) # settle_date + value_date = datetime.date(2016, 7, 15) # settle_date start_date = datetime.date(2016, 6, 20) end_date = datetime.date(2021, 6, 20) upfront_quote = -0.03063 - spread = spread_from_upfront(today_date, value_date, start_date, step_in_date, - start_date, end_date, 0.05, yc, - upfront_quote, 0.3) - #benchmark_start_date should be start_date. Not sure while there are 2 parameters - upf = upfront_charge(today_date, value_date, start_date, step_in_date, - start_date, end_date, 0.05, yc, spread, 0.3) + spread = spread_from_upfront( + today_date, + value_date, + start_date, + step_in_date, + start_date, + end_date, + 0.05, + yc, + upfront_quote, + 0.3, + ) + # benchmark_start_date should be start_date. Not sure while there are 2 parameters + upf = upfront_charge( + today_date, + value_date, + start_date, + step_in_date, + start_date, + end_date, + 0.05, + yc, + spread, + 0.3, + ) print(spread) - print(upf-upfront_quote) + print(upf - upfront_quote) coupon_leg = FeeLeg(start_date, end_date, True, 1000000, 0.01) contingent_leg = ContingentLeg(start_date, end_date, 1000000) ig_spread = 0.0070 - spread = array.array('d', [ig_spread]) - recovery = array.array('d', [0.4]) - upfront = array.array('d', [-0.0164243]) + spread = array.array("d", [ig_spread]) + recovery = array.array("d", [0.4]) + upfront = array.array("d", [-0.0164243]) print("build spread curve", flush=True) - sc = SpreadCurve(today_date, yc, start_date, step_in_date, value_date, - [end_date], spread, upfront, recovery, True) - sc_data = sc.inspect()['data'] + sc = SpreadCurve( + today_date, + yc, + start_date, + step_in_date, + value_date, + [end_date], + spread, + upfront, + recovery, + True, + ) + sc_data = sc.inspect()["data"] rate = sc_data[0][1] # by default the rate is stored as annually compounded - continuous_rate = math.log(1+rate) + continuous_rate = math.log(1 + rate) sc2 = SpreadCurve.from_flat_hazard(today_date, continuous_rate) - coupon_leg_pv = coupon_leg.pv(today_date, step_in_date, - value_date, yc, sc, True) - coupon_leg_pv2 = coupon_leg.pv(today_date, step_in_date, - value_date, yc, sc2, True) + coupon_leg_pv = coupon_leg.pv(today_date, step_in_date, value_date, yc, sc, True) + coupon_leg_pv2 = coupon_leg.pv(today_date, step_in_date, value_date, yc, sc2, True) print("contingent leg pv", flush=True) - contingent_leg_pv = contingent_leg.pv(today_date, step_in_date, - value_date, yc, sc, 0.4) - contingent_leg_pv2 = contingent_leg.pv(today_date, step_in_date, - value_date, yc, sc2, 0.4) + contingent_leg_pv = contingent_leg.pv( + today_date, step_in_date, value_date, yc, sc, 0.4 + ) + contingent_leg_pv2 = contingent_leg.pv( + today_date, step_in_date, value_date, yc, sc2, 0.4 + ) print("upfront_charge", flush=True) - pv = 1000000 * upfront_charge(today_date, value_date, start_date, step_in_date, - start_date, end_date, 0.01, yc, ig_spread, 0.4) + pv = 1000000 * upfront_charge( + today_date, + value_date, + start_date, + step_in_date, + start_date, + end_date, + 0.01, + yc, + ig_spread, + 0.4, + ) - print(pv, contingent_leg_pv-coupon_leg_pv) + print(pv, contingent_leg_pv - coupon_leg_pv) diff --git a/pyisda/utils.py b/pyisda/utils.py index a3c5456..4e3b024 100644 --- a/pyisda/utils.py +++ b/pyisda/utils.py @@ -1,12 +1,24 @@ from quantlib.settings import Settings -from quantlib.time.api import ( Date, pydate_from_qldate, WeekendsOnly, - Period, Months, ModifiedFollowing, Actual360, - Semiannual, Thirty360, Actual365Fixed ) +from quantlib.time.api import ( + Date, + pydate_from_qldate, + WeekendsOnly, + Period, + Months, + ModifiedFollowing, + Actual360, + Semiannual, + Thirty360, + Actual365Fixed, +) from quantlib.indexes.ibor_index import IborIndex from quantlib.currency.api import USDCurrency, EURCurrency from quantlib.indexes.ibor_index import IborIndex from quantlib.termstructures.yields.api import ( - PiecewiseYieldCurve, DepositRateHelper, SwapRateHelper) + PiecewiseYieldCurve, + DepositRateHelper, + SwapRateHelper, +) import numpy as np import datetime import requests @@ -16,80 +28,129 @@ import xml.etree.ElementTree as ET from pyisda.curve import YieldCurve from pyisda.date import BadDay -def getMarkitIRData(date = datetime.date.today() - datetime.timedelta(days = 1), - currency="USD"): + +def getMarkitIRData( + date=datetime.date.today() - datetime.timedelta(days=1), currency="USD" +): filename = "InterestRates_{0}_{1:%Y%m%d}".format(currency, date) - r = requests.post('http://www.markit.com/news/{0}.zip'.format(filename)) - if "zip" in r.headers['content-type']: + r = requests.post("http://www.markit.com/news/{0}.zip".format(filename)) + if "zip" in r.headers["content-type"]: with zipfile.ZipFile(BytesIO(r.content)) as z: - fh = z.open(filename + '.xml') + fh = z.open(filename + ".xml") tree = ET.parse(fh) - deposits = zip([e.text for e in tree.findall('./deposits/*/tenor')], - [float(e.text) for e in tree.findall('./deposits/*/parrate')]) - swaps = zip([e.text for e in tree.findall('./swaps/*/tenor')], - [float(e.text) for e in tree.findall('./swaps/*/parrate')]) - effectiveasof = tree.find('./effectiveasof').text - MarkitData = {'deposits': list(deposits), - 'swaps': list(swaps), - 'effectiveasof': datetime.datetime.strptime(effectiveasof[:10], - "%Y-%m-%d").date()} + deposits = zip( + [e.text for e in tree.findall("./deposits/*/tenor")], + [float(e.text) for e in tree.findall("./deposits/*/parrate")], + ) + swaps = zip( + [e.text for e in tree.findall("./swaps/*/tenor")], + [float(e.text) for e in tree.findall("./swaps/*/parrate")], + ) + effectiveasof = tree.find("./effectiveasof").text + MarkitData = { + "deposits": list(deposits), + "swaps": list(swaps), + "effectiveasof": datetime.datetime.strptime( + effectiveasof[:10], "%Y-%m-%d" + ).date(), + } return MarkitData else: - return getMarkitIRData(date-datetime.timedelta(days=1)) + return getMarkitIRData(date - datetime.timedelta(days=1)) + def rate_helpers(currency="USD", MarkitData=None): settings = Settings() if not MarkitData: - MarkitData = getMarkitIRData(pydate_from_qldate(settings.evaluation_date-1), currency) - if MarkitData['effectiveasof'] != pydate_from_qldate(settings.evaluation_date): - raise RuntimeError("Yield curve effective date: {0} doesn't " \ - "match the evaluation date: {1}".format( - MarkitData['effectiveasof'], - pydate_from_qldate(settings.evaluation_date))) + MarkitData = getMarkitIRData( + pydate_from_qldate(settings.evaluation_date - 1), currency + ) + if MarkitData["effectiveasof"] != pydate_from_qldate(settings.evaluation_date): + raise RuntimeError( + "Yield curve effective date: {0} doesn't " + "match the evaluation date: {1}".format( + MarkitData["effectiveasof"], + pydate_from_qldate(settings.evaluation_date), + ) + ) calendar = WeekendsOnly() if currency == "USD": - isda_ibor = IborIndex("IsdaIbor", Period(3, Months), 2, USDCurrency(), calendar, - ModifiedFollowing, False, Actual360()) + isda_ibor = IborIndex( + "IsdaIbor", + Period(3, Months), + 2, + USDCurrency(), + calendar, + ModifiedFollowing, + False, + Actual360(), + ) fix_freq = Semiannual elif currency == "EUR": - isda_ibor = IborIndex("IsdaIbor", Period(6, Months), 2, EURCurrency(), calendar, - ModifiedFollowing, False, Actual360()) + isda_ibor = IborIndex( + "IsdaIbor", + Period(6, Months), + 2, + EURCurrency(), + calendar, + ModifiedFollowing, + False, + Actual360(), + ) fix_freq = Annual - deps = [DepositRateHelper(q, Period(t), 2, calendar, ModifiedFollowing, - False, Actual360()) - for t, q in MarkitData['deposits']] + deps = [ + DepositRateHelper( + q, Period(t), 2, calendar, ModifiedFollowing, False, Actual360() + ) + for t, q in MarkitData["deposits"] + ] # this matches with bloomberg, but according to Markit, maturity should be unadjusted - swaps = [SwapRateHelper.from_tenor(q, Period(t), calendar, fix_freq, ModifiedFollowing, - Thirty360(), isda_ibor) for t, q in MarkitData['swaps']] + swaps = [ + SwapRateHelper.from_tenor( + q, Period(t), calendar, fix_freq, ModifiedFollowing, Thirty360(), isda_ibor + ) + for t, q in MarkitData["swaps"] + ] return deps + swaps -def YC(currency="USD", helpers = None, MarkitData=None): + +def YC(currency="USD", helpers=None, MarkitData=None): if helpers is None: helpers = rate_helpers(currency, MarkitData) - curve = PiecewiseYieldCurve(0, 1, 0, WeekendsOnly(), - helpers, Actual365Fixed()) + curve = PiecewiseYieldCurve(0, 1, 0, WeekendsOnly(), helpers, Actual365Fixed()) return curve + def build_yc(trade_date, ql_curve=False): - markit_data = getMarkitIRData(trade_date-datetime.timedelta(days=1)) + markit_data = getMarkitIRData(trade_date - datetime.timedelta(days=1)) if ql_curve: settings = Settings() settings.evaluation_date = Date.from_datetime(trade_date) - yield_helpers = rate_helpers(MarkitData = markit_data) - ql_yc = YC(helpers = yield_helpers) + yield_helpers = rate_helpers(MarkitData=markit_data) + ql_yc = YC(helpers=yield_helpers) dfs = np.array([ql_yc.discount(yh.latest_date) for yh in yield_helpers]) dates = [pydate_from_qldate(yh.latest_date) for yh in yield_helpers] - yc = YieldCurve.from_discount_factors(trade_date, dates, dfs, 'ACT/365F') + yc = YieldCurve.from_discount_factors(trade_date, dates, dfs, "ACT/365F") else: - periods, rates = zip(*markit_data['deposits']) + periods, rates = zip(*markit_data["deposits"]) periods = list(periods) rates = list(rates) - periods_swaps, rates_swaps = zip(*markit_data['swaps']) - types = 'M' * len(periods) + 'S' * len(periods_swaps) + periods_swaps, rates_swaps = zip(*markit_data["swaps"]) + types = "M" * len(periods) + "S" * len(periods_swaps) rates = np.array(rates) periods += periods_swaps rates += rates_swaps - yc = YieldCurve(trade_date, types, periods, rates, 'ACT/360', '6M', - '3M', '30/360', 'ACT/360', BadDay.MODIFIED) + yc = YieldCurve( + trade_date, + types, + periods, + rates, + "ACT/360", + "6M", + "3M", + "30/360", + "ACT/360", + BadDay.MODIFIED, + ) return yc @@ -3,25 +3,34 @@ from distutils.extension import Extension from Cython.Build import cythonize import numpy -all_extensions = Extension("*", ["pyisda/*.pyx"], - include_dirs=['c_layer', numpy.get_include()], - libraries=["cds", "farmhash"], - language='c++', - extra_compile_args=['-fopenmp'], - extra_link_args=['-fopenmp', '-Wl,--strip-all']) +all_extensions = Extension( + "*", + ["pyisda/*.pyx"], + include_dirs=["c_layer", numpy.get_include()], + libraries=["cds", "farmhash"], + language="c++", + extra_compile_args=["-fopenmp"], + extra_link_args=["-fopenmp", "-Wl,--strip-all"], +) -c_extension = Extension("pyisda.optim", - include_dirs=['c_layer', numpy.get_include()], - sources=['pyisda/optim.pyx', 'c_layer/cdsbootstrap.c'], - libraries=['cds'], - language='c++') +c_extension = Extension( + "pyisda.optim", + include_dirs=["c_layer", numpy.get_include()], + sources=["pyisda/optim.pyx", "c_layer/cdsbootstrap.c"], + libraries=["cds"], + language="c++", +) -all_extensions = cythonize([c_extension, all_extensions], nthreads=4, - compiler_directives={'embedsignature': True}) +all_extensions = cythonize( + [c_extension, all_extensions], + nthreads=4, + compiler_directives={"embedsignature": True}, +) setup( name="pyisda", - version='0.1', - author='Guillaume Horel', + version="0.1", + author="Guillaume Horel", ext_modules=all_extensions, - packages=['pyisda']) + packages=["pyisda"], +) diff --git a/tests/test_pickle.py b/tests/test_pickle.py index 6417c86..9eabcec 100644 --- a/tests/test_pickle.py +++ b/tests/test_pickle.py @@ -6,9 +6,10 @@ from pickle import dumps, loads import datetime from pyisda.utils import build_yc + class TestPickle(unittest.TestCase): trade_date = datetime.date(2016, 10, 6) - yc = build_yc(trade_date, ql_curve = True) + yc = build_yc(trade_date, ql_curve=True) def assertListAlmostEqual(self, l1, l2): if len(l1) != len(l2): @@ -19,8 +20,9 @@ class TestPickle(unittest.TestCase): def test_yc(self): orig_dfs = [self.yc.discount_factor(d) for d in self.yc.dates] pickled_yc = loads(dumps(self.yc, 2)) - self.assertListAlmostEqual([pickled_yc.discount_factor(d) for d in pickled_yc.dates], - orig_dfs) + self.assertListAlmostEqual( + [pickled_yc.discount_factor(d) for d in pickled_yc.dates], orig_dfs + ) def test_legs(self): start_date = datetime.date(2016, 3, 20) @@ -29,17 +31,34 @@ class TestPickle(unittest.TestCase): value_date = datetime.date(2016, 10, 11) cl = ContingentLeg(start_date, end_date, 1) fl = FeeLeg(start_date, end_date, True, 1, 1) - sc = SpreadCurve(self.trade_date, self.yc, start_date, - step_in_date, value_date, [end_date], np.array([75*1e-4]), 0.4) + sc = SpreadCurve( + self.trade_date, + self.yc, + start_date, + step_in_date, + value_date, + [end_date], + np.array([75 * 1e-4]), + 0.4, + ) sc_pickled = loads(dumps(sc, 2)) - self.assertListAlmostEqual([sc.survival_probability(d) for d in self.yc.dates], - [sc_pickled.survival_probability(d) for d in self.yc.dates]) + self.assertListAlmostEqual( + [sc.survival_probability(d) for d in self.yc.dates], + [sc_pickled.survival_probability(d) for d in self.yc.dates], + ) cl_pickled = loads(dumps(cl, 2)) fl_pickled = loads(dumps(fl, 2)) - self.assertEqual(cl_pickled.pv(self.trade_date, step_in_date, value_date, self.yc, sc, 0.4), - cl.pv(self.trade_date, step_in_date, value_date, self.yc, sc, 0.4)) - self.assertEqual(fl.pv(self.trade_date, step_in_date, value_date, self.yc, sc, False), - fl_pickled.pv(self.trade_date, step_in_date, value_date, self.yc, sc, False)) + self.assertEqual( + cl_pickled.pv(self.trade_date, step_in_date, value_date, self.yc, sc, 0.4), + cl.pv(self.trade_date, step_in_date, value_date, self.yc, sc, 0.4), + ) + self.assertEqual( + fl.pv(self.trade_date, step_in_date, value_date, self.yc, sc, False), + fl_pickled.pv( + self.trade_date, step_in_date, value_date, self.yc, sc, False + ), + ) + -if __name__=="__main__": +if __name__ == "__main__": unittest.main() |
