aboutsummaryrefslogtreecommitdiffstats
path: root/python/bbg_helpers.py
blob: 1d542a016d0633493a105389677a184bab9f70ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import blpapi
import sys
import pandas as pd
from contextlib import contextmanager
import logging
import datetime

logger = logging.getLogger(__name__)

BBG_IP = ['192.168.9.61', '192.168.9.65', '192.168.0.10', '192.168.0.12']

@contextmanager
def init_bbg_session(ip_list, port=8194):
    sessionOptions = blpapi.SessionOptions()
    sessionOptions.setServerPort(port)

    for ip in ip_list:
        # Start a Session
        sessionOptions.setServerHost(ip)
        session = blpapi.Session(sessionOptions)
        if not session.start():
            logger.info("Failed to open session with {0}".format(ip))
            continue
        try:
            if not session.openService("//blp/refdata"):
                raise NameError("Failed to open //blp/refdata")
            logger.info("Logged in on {0}".format(ip))
            yield session
            break
        except NameError as e:
            logger.error(str(e))
            raise
        finally:
            session.stop()

def append_overrides(request, d):
    overrides = request.getElement('overrides')
    for k, v in d.items():
        o = overrides.appendElement()
        o.setElement("fieldId", k)
        if isinstance(v, datetime.date):
            o.setElement("value", "{0:%Y%m%d}".format(v))
        else:
            o.setElement("value", v)

def event_loop(session, request):
    session.sendRequest(request)
    # Process received events
    while(True):
        # We provide timeout to give the chance to Ctrl+C handling:
        ev = session.nextEvent(500)
        if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
            for msg in ev:
                yield msg
        # Response completely received, so we can exit
        if ev.eventType() == blpapi.Event.RESPONSE:
            return

def get_pythonvalue(e):
    if e.datatype() in [blpapi.DataType.DATE, blpapi.DataType.DATETIME]:
        return pd.to_datetime(e.getValue())
    elif e.datatype() == blpapi.DataType.ENUMERATION:
        return e.getValueAsString()
    else:
        return e.getValue()

def field_array_todf(field):
    df = pd.DataFrame.from_dict([{str(e.name()): get_pythonvalue(e) for e in f.elements()} \
                                             for f in field.values()])
    if "date" in df:
        df = df.set_index('date')
    return df

def process_historical_msg(msg):
    securityData = msg.getElement("securityData")
    securityName = securityData.getElementValue("security")
    fieldData = securityData.getElement("fieldData")
    return {securityName: field_array_todf(fieldData)}

def process_reference_msg(msg):
    data = {}
    if msg.hasElement('responseError'):
        logger.error(msg.getElement('responseError').toString())
        return None
    securityDataArray = msg.getElement("securityData")
    for securityData in securityDataArray.values():
        securityName = securityData.getElementValue("security")
        row = {}
        fieldData = securityData.getElement("fieldData")
        for field in fieldData.elements():
            if not field.isValid():
                logger.info("Invalid field: {0}".format(str(field)))
            elif field.isArray():
                row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps
            else:
                row[str(field.name())] = get_pythonvalue(field)
        data[securityName] = row
    return data

def process_intraday_tick_msg(msg):
    _, tickdata = msg.getElement("tickData").elements()
    return field_array_todf(tickdata)

def retrieve_data(session, securities, fields=[], overrides={},
                  start_date=None, end_date=None, frequency="DAILY",
                  options={}):
    """
    Convenience function to retrieve data from the Bloomberg API.

    Parameters
    ----------
    session : blpapi session
    securities : list or string
       list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge']
    fields: list or string
       list of fields
    start_date : datetime.date
    end_date : datetime.date
    frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY"
    """
    refDataService = session.getService("//blp/refdata")
    if isinstance(start_date, datetime.datetime) and start_date.time() != datetime.time(0):
        request = refDataService.createRequest("IntradayTickRequest")
        request.getElement("eventTypes").appendValue("TRADE")
        request.set("includeConditionCodes", True)
    elif isinstance(start_date, datetime.date):
        request = refDataService.createRequest("HistoricalDataRequest")
        request.set("periodicitySelection", frequency)
    else:
        request = refDataService.createRequest("ReferenceDataRequest")

    if options:
        for k, v in options.items():
            request.set(k, v)

    if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
        if isinstance(securities, list):
            raise TypeError("For intraday requests, we can only handle one security")
        else:
            request.set("security", securities)
    else:
        if isinstance(securities, list):
            for security in securities:
                request.append("securities", security)
        else:
            request.append("securities", securities)

    if isinstance(fields, list):
        for field in fields:
            request.append("fields", field)
    else:
        request.append("fields", fields)

    if overrides:
        append_overrides(request, overrides)

    if start_date:
        if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
            request.set("startDateTime", start_date)
            if end_date:
                request.set("endDateTime", end_date)
        else:
            request.set("startDate", f"{start_date:%Y%m%d}")
            if end_date:
                request.set("endDate", f"{end_date:%Y%m%d}")

    data = {}
    for msg in event_loop(session, request):
        if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
            data.update(process_reference_msg(msg))
        elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
            data.update(process_historical_msg(msg))
        elif msg.messageType() == blpapi.Name("IntradayTickResponse"):
            data.update(process_intraday_tick_msg(msg))
    if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
        df = pd.DataFrame(data)
        if 'time' in df:
            df.time = df.time.dt.tz_localize('UTC').dt.tz_convert('America/New_York')
        return df
    else:
        return data

if __name__=="__main__":
    testdate = pd.datetime(2013, 1, 1)
    hist_securities = ['CADUSD Curncy', "EURUSD Curncy"]
    hist_fields = ['PX_LAST']
    securities = ['004421BW2 Mtge', '75157EAE2 Mtge', 'XS0295516776 Mtge']
    fields = ['CUR_CPN', 'START_ACC_DT']
    with init_bbg_session(BBG_IP) as session:
        hist_data = retrieve_data(session, securities, hist_fields, start_date=testdate,
                                  frequency="MONTHLY")
        overrides={'SETTLE_DT': testdate}
        ref_data = retrieve_data(session, securities, fields, overrides=overrides)
        struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])
        spx_ndx_monthly = retrieve_data(session, ["SPX Index","NDX Index"],
                                        fields=["PX_LAST"],
                                        start_date=datetime.date(2012, 1, 1),
                                        options={'periodicityAdjustment': 'ACTUAL'},
                                        frequency="MONTHLY")
        trace_data = retrieve_data(session, "BNCMT 2007-1 A5@TRAC Mtge",
                                   start_date=datetime.datetime(2016, 5, 18, 9),
                                   end_date=datetime.datetime(2017, 5, 18, 9))