diff options
| -rw-r--r-- | python/Makefile | 16 | ||||
| -rw-r--r-- | python/bbg_helpers.py | 257 | ||||
| -rw-r--r-- | python/client.py | 38 | ||||
| -rw-r--r-- | python/setup.py | 21 |
4 files changed, 1 insertions, 331 deletions
diff --git a/python/Makefile b/python/Makefile index f5f86094..b733d160 100644 --- a/python/Makefile +++ b/python/Makefile @@ -5,18 +5,4 @@ tests: tags: ctags -e --python-kinds=-iv --exclude='*.js' -R . -analytics: analytics/cms_spread_utils.cpython-38-x86_64-linux-gnu.so analytics/black.cpython-38-x86_64-linux-gnu.so lossdistrib.so - -analytics/cms_spread_utils.cpython-38-x86_64-linux-gnu.so analytics/black.cpython-38-x86_64-linux-gnu.so&: - python setup.py build_ext --inplace - -lossdistrib.so: - ln -sf ../../R/lossdistrib/src/lossdistrib.so analytics/lossdistrib.so - -clean: - rm -f analytics/*.{c,h} - -cleanall: - rm -f analytics/*.{so,c,h} - -.PHONY: clean cleanall tests +.PHONY: tags tests diff --git a/python/bbg_helpers.py b/python/bbg_helpers.py deleted file mode 100644 index 886c22dc..00000000 --- a/python/bbg_helpers.py +++ /dev/null @@ -1,257 +0,0 @@ -import blpapi -import pandas as pd -from contextlib import contextmanager -from pytz import timezone -import logging -import datetime - -logger = logging.getLogger(__name__) - -BBG_IP = ["192.168.9.28", "edwin-pc", "192.168.0.10", "192.168.0.12"] - - -@contextmanager -def init_bbg_session(ip_list, port=8194): - sessionOptions = blpapi.SessionOptions() - sessionOptions.setServerPort(port) - - for ip in ip_list: - # Start a Session - sessionOptions.setServerHost(ip) - session = blpapi.Session(sessionOptions) - if not session.start(): - logger.info("Failed to open session with {0}".format(ip)) - continue - try: - if not session.openService("//blp/refdata"): - raise NameError("Failed to open //blp/refdata") - logger.info("Logged in on {0}".format(ip)) - yield session - break - except NameError as e: - logger.error(str(e)) - raise - finally: - session.stop() - - -def append_overrides(request, d): - overrides = request.getElement("overrides") - for k, v in d.items(): - o = overrides.appendElement() - o.setElement("fieldId", k) - if isinstance(v, datetime.date): - o.setElement("value", "{0:%Y%m%d}".format(v)) - else: - o.setElement("value", v) - - -def event_loop(session, request): - session.sendRequest(request) - # Process received events - while True: - # We provide timeout to give the chance to Ctrl+C handling: - ev = session.nextEvent(500) - if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]: - for msg in ev: - yield msg - # Response completely received, so we can exit - if ev.eventType() == blpapi.Event.RESPONSE: - return - - -def get_pythonvalue(e): - if e.isNull(): - return None - if e.datatype() == blpapi.DataType.DATE: - return pd.to_datetime(e.getValue()) - elif e.datatype() == blpapi.DataType.DATETIME: - t = e.getValue() - if isinstance(t, datetime.time): - return t - else: - return pd.to_datetime(t, utc=True) - elif e.datatype() == blpapi.DataType.ENUMERATION: - return e.getValueAsString() - else: - return e.getValue() - - -def field_array_todf(field): - df = pd.DataFrame.from_dict( - [ - {str(e.name()): get_pythonvalue(e) for e in f.elements()} - for f in field.values() - ] - ) - if "date" in df: - df = df.set_index("date") - return df - - -def process_historical_msg(msg): - securityData = msg.getElement("securityData") - securityName = securityData.getElementValue("security") - fieldData = securityData.getElement("fieldData") - return {securityName: field_array_todf(fieldData)} - - -def process_reference_msg(msg): - data = {} - securityDataArray = msg.getElement("securityData") - for securityData in securityDataArray.values(): - securityName = securityData.getElementValue("security") - row = {} - fieldData = securityData.getElement("fieldData") - for field in fieldData.elements(): - if not field.isValid(): - logger.info(f"Invalid field: {str(field)}") - elif field.isArray(): - row[str(field.name())] = field_array_todf( - field - ) # to convert dates to timestamps - else: - row[str(field.name())] = get_pythonvalue(field) - data[securityName] = row - return data - - -def process_intraday_tick_msg(msg): - _, tickdata = msg.getElement("tickData").elements() - return field_array_todf(tickdata) - - -def retrieve_data( - session, - securities, - fields=[], - overrides={}, - start_date=None, - end_date=None, - frequency="DAILY", - options={}, - event_types=["TRADE"], -): - """ - Convenience function to retrieve data from the Bloomberg API. - - Parameters - ---------- - session : blpapi session - securities : list or string - list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge'] - fields: list or string - list of fields - start_date : datetime.date - end_date : datetime.date - frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY" - event_types: list of string - (only "TRADE", "BID" and "ASK" events seem to be working). - """ - refDataService = session.getService("//blp/refdata") - if isinstance(start_date, datetime.datetime): - request = refDataService.createRequest("IntradayTickRequest") - for et in event_types: - request.getElement("eventTypes").appendValue(et) - request.set("includeConditionCodes", True) - elif isinstance(start_date, datetime.date): - request = refDataService.createRequest("HistoricalDataRequest") - request.set("periodicitySelection", frequency) - else: - request = refDataService.createRequest("ReferenceDataRequest") - - if options: - for k, v in options.items(): - request.set(k, v) - - if request.asElement().name() == blpapi.Name("IntradayTickRequest"): - if isinstance(securities, list): - raise TypeError("For intraday requests, we can only handle one security") - else: - request.set("security", securities) - else: - if hasattr(securities, "__iter__"): - for security in securities: - request.append("securities", security) - else: - request.append("securities", securities) - - if isinstance(fields, list): - for field in fields: - request.append("fields", field) - else: - request.append("fields", fields) - - if overrides: - append_overrides(request, overrides) - - if start_date: - if request.asElement().name() == blpapi.Name("IntradayTickRequest"): - start_date = timezone("America/New_York").localize(start_date) - start_date = start_date.astimezone(timezone("GMT")) - request.set("startDateTime", start_date) - if end_date: - end_date = timezone("America/New_York").localize(end_date) - end_date = end_date.astimezone(timezone("GMT")) - request.set("endDateTime", end_date) - else: - request.set("startDate", "{:%Y%m%d}".format(start_date)) - if end_date: - request.set("endDate", "{:%Y%m%d}".format(end_date)) - - data = {} - for msg in event_loop(session, request): - if msg.hasElement("responseError"): - logger.error(msg.getElement("responseError").getElementAsString("message")) - continue - if msg.messageType() == blpapi.Name("ReferenceDataResponse"): - data.update(process_reference_msg(msg)) - elif msg.messageType() == blpapi.Name("HistoricalDataResponse"): - data.update(process_historical_msg(msg)) - elif msg.messageType() == blpapi.Name("IntradayTickResponse"): - data.update(process_intraday_tick_msg(msg)) - if request.asElement().name() == blpapi.Name("IntradayTickRequest"): - df = pd.DataFrame(data) - if "time" in df: - df.time = df.time.dt.tz_convert("America/New_York") - return df - else: - return data - - -if __name__ == "__main__": - testdate = pd.datetime(2013, 1, 1) - hist_securities = ["CADUSD Curncy", "EURUSD Curncy"] - hist_fields = ["PX_LAST"] - securities = ["004421BW2 Mtge", "75157EAE2 Mtge", "XS0295516776 Mtge"] - fields = ["CUR_CPN", "START_ACC_DT"] - with init_bbg_session(BBG_IP) as session: - hist_data = retrieve_data( - session, - securities, - hist_fields, - start_date=testdate.date(), - frequency="MONTHLY", - ) - overrides = {"SETTLE_DT": testdate} - ref_data = retrieve_data(session, securities, fields, overrides=overrides) - struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"]) - spx_ndx_monthly = retrieve_data( - session, - ["SPX Index", "NDX Index"], - fields=["PX_LAST"], - start_date=datetime.date(2012, 1, 1), - options={"periodicityAdjustment": "ACTUAL"}, - frequency="MONTHLY", - ) - trace_data = retrieve_data( - session, - "BNCMT 2007-1 A5@TRAC Mtge", - start_date=datetime.datetime(2018, 5, 18, 9), - end_date=datetime.datetime(2018, 9, 18, 9), - ) - today = datetime.datetime.today() - yesterday = today - datetime.timedelta(days=1) - tick_data = retrieve_data( - session, "GOOG US Equity", start_date=yesterday, end_date=today - ) diff --git a/python/client.py b/python/client.py deleted file mode 100644 index d68334c1..00000000 --- a/python/client.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -import sys - -from utils.db import dbconn -from common import get_redis_queue -from subprocess import CalledProcessError -from tasks import Rpc - -def run(): - ET = dbconn('etdb') - q = get_redis_queue() - while True: - rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8')) - - if rpc.fun == 'generate_scenarios': - rpc.args += [ET] - logger.info("running: {}, {}".format(rpc.fun, rpc.args)) - try: - rpc() - except CalledProcessError: - logger.error("'{}' did not complete".format(rpc.fun)) - else: - logger.info("'{}' completed".format(rpc.fun)) - if rpc.fun == "build_portfolios": - q.rpush("tasks", str(Rpc("build_scenarios", rpc.args))) - if rpc.fun == "build_scenarios": - q.rpush("tasks", str(Rpc("generate_scenarios", rpc.args[:-1]))) - ET.close() - -if __name__=="__main__": - logger = logging.getLogger('intex') - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - logger.setLevel(logging.INFO) - # log to stderr, which is intercepted by circus - sh = logging.StreamHandler(sys.stdout) - sh.setFormatter(formatter) - logger.addHandler(sh) - run() diff --git a/python/setup.py b/python/setup.py deleted file mode 100644 index 1bbb1c8a..00000000 --- a/python/setup.py +++ /dev/null @@ -1,21 +0,0 @@ -from distutils.core import setup -from distutils.extension import Extension -from Cython.Build import cythonize -import numpy - -ext = Extension( - "*", - ["analytics/*.pyx"], - include_dirs=[numpy.get_include()], -) - -ext = cythonize([ext], - nthreads=2 -) - - -setup( - name="analytics", - packages=["analytics"], - ext_modules=ext, -) |
