import blpapi import sys import pandas as pd from contextlib import contextmanager import logging import datetime logger = logging.getLogger(__name__) BBG_IP = ['192.168.9.61', '192.168.0.8', '192.168.0.10', '192.168.0.12'] @contextmanager def init_bbg_session(ip_list, port=8194): sessionOptions = blpapi.SessionOptions() sessionOptions.setServerPort(port) for ip in ip_list: # Start a Session sessionOptions.setServerHost(ip) session = blpapi.Session(sessionOptions) if not session.start(): logger.info("Failed to open session with {0}".format(ip)) continue try: if not session.openService("//blp/refdata"): raise NameError("Failed to open //blp/refdata") logger.info("Logged in on {0}".format(ip)) yield session break except NameError as e: logger.error(str(e)) raise finally: session.stop() def append_overrides(request, d): overrides = request.getElement('overrides') for k, v in d.items(): o = overrides.appendElement() o.setElement("fieldId", k) if isinstance(v, datetime.date): o.setElement("value", "{0:%Y%m%d}".format(v)) else: o.setElement("value", v) def event_loop(session, request): session.sendRequest(request) # Process received events while(True): # We provide timeout to give the chance to Ctrl+C handling: ev = session.nextEvent(500) if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]: for msg in ev: yield msg # Response completely received, so we can exit if ev.eventType() == blpapi.Event.RESPONSE: raise StopIteration def field_array_todf(field): df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \ for f in field.values()]) return df.convert_objects(convert_dates='coerce') def process_historical_msg(msg): securityData = msg.getElement("securityData") securityName = securityData.getElementValue("security") fieldData = securityData.getElement("fieldData") return {securityName: field_array_todf(fieldData)} def process_reference_msg(msg): data = {} securityDataArray = msg.getElement("securityData") for securityData in securityDataArray.values(): securityName = securityData.getElementValue("security") row = {} fieldData = securityData.getElement("fieldData") for field in fieldData.elements(): if not field.isValid(): logger.info("Invalid field: {0}".format(str(field))) elif field.isArray(): row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps else: row[str(field.name())] = field.getValue() data[securityName] = row return data def retrieve_data(session, securities, fields, overrides={}, start_date=None, end_date=None): refDataService = session.getService("//blp/refdata") if start_date: request = refDataService.createRequest("HistoricalDataRequest") else: request = refDataService.createRequest("ReferenceDataRequest") for security in securities: request.append("securities", security) for field in fields: request.append("fields", field) if overrides: append_overrides(request, overrides) if start_date: request.set("startDate", "{0:%Y%m%d}".format(start_date)) if end_date: request.set("endDate", "{0:%Y%m%d}".format(end_date)) data = {} for msg in event_loop(session, request): if msg.messageType() == blpapi.Name("ReferenceDataResponse"): data.update(process_reference_msg(msg)) elif msg.messageType() == blpapi.Name("HistoricalDataResponse"): data.update(process_historical_msg(msg)) return data if __name__=="__main__": testdate = pd.datetime(2013, 1, 1) hist_securities = ['CADUSD Curncy', "EURUSD Curncy"] hist_fields = ['PX_LAST'] securities = ['38145BAA9 Mtge', '75157EAE2 Mtge', 'XS0295516776 Mtge'] fields = ['CUR_CPN', 'START_ACC_DT'] with init_bbg_session(BBG_IP) as session: hist_data = retrieve_data(session, hist_securities, hist_fields, start_date=testdate) overrides={'SETTLE_DT': testdate} ref_data = retrieve_data(session, securities, fields, overrides=overrides) struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])