import blpapi import pandas as pd from contextlib import contextmanager from pytz import timezone import logging import datetime logger = logging.getLogger(__name__) BBG_IP = ["192.168.9.28", "edwin-pc", "192.168.0.10", "192.168.0.12"] @contextmanager def init_bbg_session(ip_list, port=8194): sessionOptions = blpapi.SessionOptions() sessionOptions.setServerPort(port) for ip in ip_list: # Start a Session sessionOptions.setServerHost(ip) session = blpapi.Session(sessionOptions) if not session.start(): logger.info("Failed to open session with {0}".format(ip)) continue try: if not session.openService("//blp/refdata"): raise NameError("Failed to open //blp/refdata") logger.info("Logged in on {0}".format(ip)) yield session break except NameError as e: logger.error(str(e)) raise finally: session.stop() def append_overrides(request, d): overrides = request.getElement("overrides") for k, v in d.items(): o = overrides.appendElement() o.setElement("fieldId", k) if isinstance(v, datetime.date): o.setElement("value", "{0:%Y%m%d}".format(v)) else: o.setElement("value", v) def event_loop(session, request): session.sendRequest(request) # Process received events while True: # We provide timeout to give the chance to Ctrl+C handling: ev = session.nextEvent(500) if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]: for msg in ev: yield msg # Response completely received, so we can exit if ev.eventType() == blpapi.Event.RESPONSE: return def get_pythonvalue(e): if e.isNull(): return None if e.datatype() == blpapi.DataType.DATE: return pd.to_datetime(e.getValue()) elif e.datatype() == blpapi.DataType.DATETIME: t = e.getValue() if isinstance(t, datetime.time): return t else: return pd.to_datetime(t, utc=True) elif e.datatype() == blpapi.DataType.ENUMERATION: return e.getValueAsString() else: return e.getValue() def field_array_todf(field): df = pd.DataFrame.from_dict( [ {str(e.name()): get_pythonvalue(e) for e in f.elements()} for f in field.values() ] ) if "date" in df: df = df.set_index("date") return df def process_historical_msg(msg): securityData = msg.getElement("securityData") securityName = securityData.getElementValue("security") fieldData = securityData.getElement("fieldData") return {securityName: field_array_todf(fieldData)} def process_reference_msg(msg): data = {} securityDataArray = msg.getElement("securityData") for securityData in securityDataArray.values(): securityName = securityData.getElementValue("security") row = {} fieldData = securityData.getElement("fieldData") for field in fieldData.elements(): if not field.isValid(): logger.info(f"Invalid field: {str(field)}") elif field.isArray(): row[str(field.name())] = field_array_todf( field ) # to convert dates to timestamps else: row[str(field.name())] = get_pythonvalue(field) data[securityName] = row return data def process_intraday_tick_msg(msg): _, tickdata = msg.getElement("tickData").elements() return field_array_todf(tickdata) def retrieve_data( session, securities, fields=[], overrides={}, start_date=None, end_date=None, frequency="DAILY", options={}, event_types=["TRADE"], ): """ Convenience function to retrieve data from the Bloomberg API. Parameters ---------- session : blpapi session securities : list or string list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge'] fields: list or string list of fields start_date : datetime.date end_date : datetime.date frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY" event_types: list of string (only "TRADE", "BID" and "ASK" events seem to be working). """ refDataService = session.getService("//blp/refdata") if isinstance(start_date, datetime.datetime): request = refDataService.createRequest("IntradayTickRequest") for et in event_types: request.getElement("eventTypes").appendValue(et) request.set("includeConditionCodes", True) elif isinstance(start_date, datetime.date): request = refDataService.createRequest("HistoricalDataRequest") request.set("periodicitySelection", frequency) else: request = refDataService.createRequest("ReferenceDataRequest") if options: for k, v in options.items(): request.set(k, v) if request.asElement().name() == blpapi.Name("IntradayTickRequest"): if isinstance(securities, list): raise TypeError("For intraday requests, we can only handle one security") else: request.set("security", securities) else: if hasattr(securities, "__iter__"): for security in securities: request.append("securities", security) else: request.append("securities", securities) if isinstance(fields, list): for field in fields: request.append("fields", field) else: request.append("fields", fields) if overrides: append_overrides(request, overrides) if start_date: if request.asElement().name() == blpapi.Name("IntradayTickRequest"): start_date = timezone("America/New_York").localize(start_date) start_date = start_date.astimezone(timezone("GMT")) request.set("startDateTime", start_date) if end_date: end_date = timezone("America/New_York").localize(end_date) end_date = end_date.astimezone(timezone("GMT")) request.set("endDateTime", end_date) else: request.set("startDate", "{:%Y%m%d}".format(start_date)) if end_date: request.set("endDate", "{:%Y%m%d}".format(end_date)) data = {} for msg in event_loop(session, request): if msg.hasElement("responseError"): logger.error(msg.getElement("responseError").getElementAsString("message")) continue if msg.messageType() == blpapi.Name("ReferenceDataResponse"): data.update(process_reference_msg(msg)) elif msg.messageType() == blpapi.Name("HistoricalDataResponse"): data.update(process_historical_msg(msg)) elif msg.messageType() == blpapi.Name("IntradayTickResponse"): data.update(process_intraday_tick_msg(msg)) if request.asElement().name() == blpapi.Name("IntradayTickRequest"): df = pd.DataFrame(data) if "time" in df: df.time = df.time.dt.tz_convert("America/New_York") return df else: return data if __name__ == "__main__": testdate = pd.datetime(2013, 1, 1) hist_securities = ["CADUSD Curncy", "EURUSD Curncy"] hist_fields = ["PX_LAST"] securities = ["004421BW2 Mtge", "75157EAE2 Mtge", "XS0295516776 Mtge"] fields = ["CUR_CPN", "START_ACC_DT"] with init_bbg_session(BBG_IP) as session: hist_data = retrieve_data( session, securities, hist_fields, start_date=testdate.date(), frequency="MONTHLY", ) overrides = {"SETTLE_DT": testdate} ref_data = retrieve_data(session, securities, fields, overrides=overrides) struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"]) spx_ndx_monthly = retrieve_data( session, ["SPX Index", "NDX Index"], fields=["PX_LAST"], start_date=datetime.date(2012, 1, 1), options={"periodicityAdjustment": "ACTUAL"}, frequency="MONTHLY", ) trace_data = retrieve_data( session, "BNCMT 2007-1 A5@TRAC Mtge", start_date=datetime.datetime(2018, 5, 18, 9), end_date=datetime.datetime(2018, 9, 18, 9), ) today = datetime.datetime.today() yesterday = today - datetime.timedelta(days=1) tick_data = retrieve_data( session, "GOOG US Equity", start_date=yesterday, end_date=today )