diff options
Diffstat (limited to 'python/bbg_helpers.py')
| -rw-r--r-- | python/bbg_helpers.py | 129 |
1 files changed, 71 insertions, 58 deletions
diff --git a/python/bbg_helpers.py b/python/bbg_helpers.py index 9c34899c..5c2cd4b5 100644 --- a/python/bbg_helpers.py +++ b/python/bbg_helpers.py @@ -28,7 +28,58 @@ def init_bbg_session(ipaddr, port=8194): finally: session.stop() -def retreive_data(session, securities, fields, settle_date=None, +def append_overrides(request, override): + overrides = request.getElement('overrides') + for k, v in override.items(): + o = overrides.appendElement() + o.setElement("fieldId", k) + if isinstance(v, pd.datetime): + o.setElement("value", "{0:%Y%m%d}".format(v)) + else: + o.setElement("value", v) + +def event_loop(session, request): + session.sendRequest(request) + # Process received events + while(True): + # We provide timeout to give the chance to Ctrl+C handling: + ev = session.nextEvent(500) + if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]: + for msg in ev: + yield msg + # Response completely received, so we can exit + if ev.eventType() == blpapi.Event.RESPONSE: + raise StopIteration + +def field_array_todf(field): + df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \ + for f in field.values()]) + return df.convert_objects(convert_dates='coerce') + +def process_historical_msg(msg): + securityData = msg.getElement("securityData") + securityName = securityData.getElementValue("security") + fieldData = securityData.getElement("fieldData") + return {securityName: field_array_todf(fieldData)} + +def process_reference_msg(msg): + data = {} + securityDataArray = msg.getElement("securityData") + for securityData in securityDataArray.values(): + securityName = securityData.getElementValue("security") + row = {} + fieldData = securityData.getElement("fieldData") + for field in fieldData.elements(): + if not field.isValid(): + logger.info("Invalid field: {0}".format(str(field))) + elif field.isArray(): + row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps + else: + row[str(field.name())] = field.getValue() + data[securityName] = row + return data + +def retreive_data(session, securities, fields, overrides={}, start_date=None, end_date=None): refDataService = session.getService("//blp/refdata") if start_date: @@ -39,67 +90,29 @@ def retreive_data(session, securities, fields, settle_date=None, request.append("securities", security) for field in fields: request.append("fields", field) - if settle_date: - overrides = request.getElement('overrides') - o = overrides.appendElement() - o.setElement("fieldId", "SETTLE_DT") - o.setElement("value", "{0:%Y%m%d}".format(settle_date)) + if overrides: + append_overrides(request, overrides) if start_date: request.set("startDate", "{0:%Y%m%d}".format(start_date)) - request.set("endDate", "{0:%Y%m%d}".format(end_date)) + if end_date: + request.set("endDate", "{0:%Y%m%d}".format(end_date)) - session.sendRequest(request) - data = [] - # Process received events - while(True): - # We provide timeout to give the chance to Ctrl+C handling: - ev = session.nextEvent(500) - if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]: - for msg in ev: - data.append(msg) - # Response completely received, so we could exit - if ev.eventType() == blpapi.Event.RESPONSE: - break - return data - -def process_msgs(data): - # return a dict whose keys are the bloomberg securities - # and whose values are a dict of bloomberg fields -> values - # the values can either be a scalar or a dataframe - newdata = {} - for msg in data: + data = {} + for msg in event_loop(session, request): if msg.messageType() == blpapi.Name("ReferenceDataResponse"): - securityDataArray = msg.getElement("securityData") - for securityData in securityDataArray.values(): - securityName = securityData.getElementValue("security") - row = {} - fieldData = securityData.getElement("fieldData") - for field in fieldData.elements(): - if not field.isValid(): - logger.info("Invalid field: {0}".format(str(field))) - elif field.isArray(): - df = pd.DataFrame.from_dict( - [{str(e.name()): e.getValue() for e in f.elements()} \ - for f in field.values()]) - row[str(field.name())] = df.convert_objects(convert_dates='coerce') #to convert dates to timestamps - else: - row[str(field.name())] = field.getValue() - newdata[securityName] = row - if msg.messageType() == blpapi.Name("HistoricalDataResponse"): - securityData = msg.getElement("securityData") - securityName = securityData.getElementValue("security") - fieldData = securityData.getElement("fieldData") - df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \ - for f in fieldData.values()]) - df.convert_objects(convert_dates='coerce') - newdata[securityName] = df - return newdata + data.update(process_reference_msg(msg)) + elif msg.messageType() == blpapi.Name("HistoricalDataResponse"): + data.update(process_historical_msg(msg)) + return data if __name__=="__main__": - startdate = pd.datetime(2013, 1, 1) - enddate = pd.datetime(2015, 8, 7) - securities = ['CADUSD Curncy', "EURUSD Curncy"] - fields = ['PX_LAST'] + testdate = pd.datetime(2015, 1, 1) + hist_securities = ['CADUSD Curncy', "EURUSD Curncy"] + hist_fields = ['PX_LAST'] + securities = ['38145BAA9 Mtge', '75157EAE2 Mtge'] + fields = ['CUR_CPN', 'START_ACC_DT'] with init_bbg_session('192.168.0.4', 8194) as session: - data = retreive_data(session, securities, fields, start_date=startdate, end_date=enddate) - test = process_msgs(data) + hist_data = retreive_data(session, hist_securities, hist_fields, start_date=testdate) + overrides={'SETTLE_DT': testdate} + ref_data = retreive_data(session, securities, fields, overrides=overrides) + struct_data = retreive_data(session, securities, ["HIST_CASH_FLOW"]) |
