aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/Makefile16
-rw-r--r--python/bbg_helpers.py257
-rw-r--r--python/client.py38
-rw-r--r--python/setup.py21
4 files changed, 1 insertions, 331 deletions
diff --git a/python/Makefile b/python/Makefile
index f5f86094..b733d160 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -5,18 +5,4 @@ tests:
tags:
ctags -e --python-kinds=-iv --exclude='*.js' -R .
-analytics: analytics/cms_spread_utils.cpython-38-x86_64-linux-gnu.so analytics/black.cpython-38-x86_64-linux-gnu.so lossdistrib.so
-
-analytics/cms_spread_utils.cpython-38-x86_64-linux-gnu.so analytics/black.cpython-38-x86_64-linux-gnu.so&:
- python setup.py build_ext --inplace
-
-lossdistrib.so:
- ln -sf ../../R/lossdistrib/src/lossdistrib.so analytics/lossdistrib.so
-
-clean:
- rm -f analytics/*.{c,h}
-
-cleanall:
- rm -f analytics/*.{so,c,h}
-
-.PHONY: clean cleanall tests
+.PHONY: tags tests
diff --git a/python/bbg_helpers.py b/python/bbg_helpers.py
deleted file mode 100644
index 886c22dc..00000000
--- a/python/bbg_helpers.py
+++ /dev/null
@@ -1,257 +0,0 @@
-import blpapi
-import pandas as pd
-from contextlib import contextmanager
-from pytz import timezone
-import logging
-import datetime
-
-logger = logging.getLogger(__name__)
-
-BBG_IP = ["192.168.9.28", "edwin-pc", "192.168.0.10", "192.168.0.12"]
-
-
-@contextmanager
-def init_bbg_session(ip_list, port=8194):
- sessionOptions = blpapi.SessionOptions()
- sessionOptions.setServerPort(port)
-
- for ip in ip_list:
- # Start a Session
- sessionOptions.setServerHost(ip)
- session = blpapi.Session(sessionOptions)
- if not session.start():
- logger.info("Failed to open session with {0}".format(ip))
- continue
- try:
- if not session.openService("//blp/refdata"):
- raise NameError("Failed to open //blp/refdata")
- logger.info("Logged in on {0}".format(ip))
- yield session
- break
- except NameError as e:
- logger.error(str(e))
- raise
- finally:
- session.stop()
-
-
-def append_overrides(request, d):
- overrides = request.getElement("overrides")
- for k, v in d.items():
- o = overrides.appendElement()
- o.setElement("fieldId", k)
- if isinstance(v, datetime.date):
- o.setElement("value", "{0:%Y%m%d}".format(v))
- else:
- o.setElement("value", v)
-
-
-def event_loop(session, request):
- session.sendRequest(request)
- # Process received events
- while True:
- # We provide timeout to give the chance to Ctrl+C handling:
- ev = session.nextEvent(500)
- if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
- for msg in ev:
- yield msg
- # Response completely received, so we can exit
- if ev.eventType() == blpapi.Event.RESPONSE:
- return
-
-
-def get_pythonvalue(e):
- if e.isNull():
- return None
- if e.datatype() == blpapi.DataType.DATE:
- return pd.to_datetime(e.getValue())
- elif e.datatype() == blpapi.DataType.DATETIME:
- t = e.getValue()
- if isinstance(t, datetime.time):
- return t
- else:
- return pd.to_datetime(t, utc=True)
- elif e.datatype() == blpapi.DataType.ENUMERATION:
- return e.getValueAsString()
- else:
- return e.getValue()
-
-
-def field_array_todf(field):
- df = pd.DataFrame.from_dict(
- [
- {str(e.name()): get_pythonvalue(e) for e in f.elements()}
- for f in field.values()
- ]
- )
- if "date" in df:
- df = df.set_index("date")
- return df
-
-
-def process_historical_msg(msg):
- securityData = msg.getElement("securityData")
- securityName = securityData.getElementValue("security")
- fieldData = securityData.getElement("fieldData")
- return {securityName: field_array_todf(fieldData)}
-
-
-def process_reference_msg(msg):
- data = {}
- securityDataArray = msg.getElement("securityData")
- for securityData in securityDataArray.values():
- securityName = securityData.getElementValue("security")
- row = {}
- fieldData = securityData.getElement("fieldData")
- for field in fieldData.elements():
- if not field.isValid():
- logger.info(f"Invalid field: {str(field)}")
- elif field.isArray():
- row[str(field.name())] = field_array_todf(
- field
- ) # to convert dates to timestamps
- else:
- row[str(field.name())] = get_pythonvalue(field)
- data[securityName] = row
- return data
-
-
-def process_intraday_tick_msg(msg):
- _, tickdata = msg.getElement("tickData").elements()
- return field_array_todf(tickdata)
-
-
-def retrieve_data(
- session,
- securities,
- fields=[],
- overrides={},
- start_date=None,
- end_date=None,
- frequency="DAILY",
- options={},
- event_types=["TRADE"],
-):
- """
- Convenience function to retrieve data from the Bloomberg API.
-
- Parameters
- ----------
- session : blpapi session
- securities : list or string
- list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge']
- fields: list or string
- list of fields
- start_date : datetime.date
- end_date : datetime.date
- frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY"
- event_types: list of string
- (only "TRADE", "BID" and "ASK" events seem to be working).
- """
- refDataService = session.getService("//blp/refdata")
- if isinstance(start_date, datetime.datetime):
- request = refDataService.createRequest("IntradayTickRequest")
- for et in event_types:
- request.getElement("eventTypes").appendValue(et)
- request.set("includeConditionCodes", True)
- elif isinstance(start_date, datetime.date):
- request = refDataService.createRequest("HistoricalDataRequest")
- request.set("periodicitySelection", frequency)
- else:
- request = refDataService.createRequest("ReferenceDataRequest")
-
- if options:
- for k, v in options.items():
- request.set(k, v)
-
- if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
- if isinstance(securities, list):
- raise TypeError("For intraday requests, we can only handle one security")
- else:
- request.set("security", securities)
- else:
- if hasattr(securities, "__iter__"):
- for security in securities:
- request.append("securities", security)
- else:
- request.append("securities", securities)
-
- if isinstance(fields, list):
- for field in fields:
- request.append("fields", field)
- else:
- request.append("fields", fields)
-
- if overrides:
- append_overrides(request, overrides)
-
- if start_date:
- if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
- start_date = timezone("America/New_York").localize(start_date)
- start_date = start_date.astimezone(timezone("GMT"))
- request.set("startDateTime", start_date)
- if end_date:
- end_date = timezone("America/New_York").localize(end_date)
- end_date = end_date.astimezone(timezone("GMT"))
- request.set("endDateTime", end_date)
- else:
- request.set("startDate", "{:%Y%m%d}".format(start_date))
- if end_date:
- request.set("endDate", "{:%Y%m%d}".format(end_date))
-
- data = {}
- for msg in event_loop(session, request):
- if msg.hasElement("responseError"):
- logger.error(msg.getElement("responseError").getElementAsString("message"))
- continue
- if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
- data.update(process_reference_msg(msg))
- elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
- data.update(process_historical_msg(msg))
- elif msg.messageType() == blpapi.Name("IntradayTickResponse"):
- data.update(process_intraday_tick_msg(msg))
- if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
- df = pd.DataFrame(data)
- if "time" in df:
- df.time = df.time.dt.tz_convert("America/New_York")
- return df
- else:
- return data
-
-
-if __name__ == "__main__":
- testdate = pd.datetime(2013, 1, 1)
- hist_securities = ["CADUSD Curncy", "EURUSD Curncy"]
- hist_fields = ["PX_LAST"]
- securities = ["004421BW2 Mtge", "75157EAE2 Mtge", "XS0295516776 Mtge"]
- fields = ["CUR_CPN", "START_ACC_DT"]
- with init_bbg_session(BBG_IP) as session:
- hist_data = retrieve_data(
- session,
- securities,
- hist_fields,
- start_date=testdate.date(),
- frequency="MONTHLY",
- )
- overrides = {"SETTLE_DT": testdate}
- ref_data = retrieve_data(session, securities, fields, overrides=overrides)
- struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])
- spx_ndx_monthly = retrieve_data(
- session,
- ["SPX Index", "NDX Index"],
- fields=["PX_LAST"],
- start_date=datetime.date(2012, 1, 1),
- options={"periodicityAdjustment": "ACTUAL"},
- frequency="MONTHLY",
- )
- trace_data = retrieve_data(
- session,
- "BNCMT 2007-1 A5@TRAC Mtge",
- start_date=datetime.datetime(2018, 5, 18, 9),
- end_date=datetime.datetime(2018, 9, 18, 9),
- )
- today = datetime.datetime.today()
- yesterday = today - datetime.timedelta(days=1)
- tick_data = retrieve_data(
- session, "GOOG US Equity", start_date=yesterday, end_date=today
- )
diff --git a/python/client.py b/python/client.py
deleted file mode 100644
index d68334c1..00000000
--- a/python/client.py
+++ /dev/null
@@ -1,38 +0,0 @@
-import logging
-import sys
-
-from utils.db import dbconn
-from common import get_redis_queue
-from subprocess import CalledProcessError
-from tasks import Rpc
-
-def run():
- ET = dbconn('etdb')
- q = get_redis_queue()
- while True:
- rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
-
- if rpc.fun == 'generate_scenarios':
- rpc.args += [ET]
- logger.info("running: {}, {}".format(rpc.fun, rpc.args))
- try:
- rpc()
- except CalledProcessError:
- logger.error("'{}' did not complete".format(rpc.fun))
- else:
- logger.info("'{}' completed".format(rpc.fun))
- if rpc.fun == "build_portfolios":
- q.rpush("tasks", str(Rpc("build_scenarios", rpc.args)))
- if rpc.fun == "build_scenarios":
- q.rpush("tasks", str(Rpc("generate_scenarios", rpc.args[:-1])))
- ET.close()
-
-if __name__=="__main__":
- logger = logging.getLogger('intex')
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger.setLevel(logging.INFO)
- # log to stderr, which is intercepted by circus
- sh = logging.StreamHandler(sys.stdout)
- sh.setFormatter(formatter)
- logger.addHandler(sh)
- run()
diff --git a/python/setup.py b/python/setup.py
deleted file mode 100644
index 1bbb1c8a..00000000
--- a/python/setup.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from distutils.core import setup
-from distutils.extension import Extension
-from Cython.Build import cythonize
-import numpy
-
-ext = Extension(
- "*",
- ["analytics/*.pyx"],
- include_dirs=[numpy.get_include()],
-)
-
-ext = cythonize([ext],
- nthreads=2
-)
-
-
-setup(
- name="analytics",
- packages=["analytics"],
- ext_modules=ext,
-)