aboutsummaryrefslogtreecommitdiffstats
path: root/python/bbg_helpers.py
blob: 886c22dc3c2e126f4a938ff079b4c6a506ace56f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import blpapi
import pandas as pd
from contextlib import contextmanager
from pytz import timezone
import logging
import datetime

logger = logging.getLogger(__name__)

BBG_IP = ["192.168.9.28", "edwin-pc", "192.168.0.10", "192.168.0.12"]


@contextmanager
def init_bbg_session(ip_list, port=8194):
    sessionOptions = blpapi.SessionOptions()
    sessionOptions.setServerPort(port)

    for ip in ip_list:
        # Start a Session
        sessionOptions.setServerHost(ip)
        session = blpapi.Session(sessionOptions)
        if not session.start():
            logger.info("Failed to open session with {0}".format(ip))
            continue
        try:
            if not session.openService("//blp/refdata"):
                raise NameError("Failed to open //blp/refdata")
            logger.info("Logged in on {0}".format(ip))
            yield session
            break
        except NameError as e:
            logger.error(str(e))
            raise
        finally:
            session.stop()


def append_overrides(request, d):
    overrides = request.getElement("overrides")
    for k, v in d.items():
        o = overrides.appendElement()
        o.setElement("fieldId", k)
        if isinstance(v, datetime.date):
            o.setElement("value", "{0:%Y%m%d}".format(v))
        else:
            o.setElement("value", v)


def event_loop(session, request):
    session.sendRequest(request)
    # Process received events
    while True:
        # We provide timeout to give the chance to Ctrl+C handling:
        ev = session.nextEvent(500)
        if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
            for msg in ev:
                yield msg
        # Response completely received, so we can exit
        if ev.eventType() == blpapi.Event.RESPONSE:
            return


def get_pythonvalue(e):
    if e.isNull():
        return None
    if e.datatype() == blpapi.DataType.DATE:
        return pd.to_datetime(e.getValue())
    elif e.datatype() == blpapi.DataType.DATETIME:
        t = e.getValue()
        if isinstance(t, datetime.time):
            return t
        else:
            return pd.to_datetime(t, utc=True)
    elif e.datatype() == blpapi.DataType.ENUMERATION:
        return e.getValueAsString()
    else:
        return e.getValue()


def field_array_todf(field):
    df = pd.DataFrame.from_dict(
        [
            {str(e.name()): get_pythonvalue(e) for e in f.elements()}
            for f in field.values()
        ]
    )
    if "date" in df:
        df = df.set_index("date")
    return df


def process_historical_msg(msg):
    securityData = msg.getElement("securityData")
    securityName = securityData.getElementValue("security")
    fieldData = securityData.getElement("fieldData")
    return {securityName: field_array_todf(fieldData)}


def process_reference_msg(msg):
    data = {}
    securityDataArray = msg.getElement("securityData")
    for securityData in securityDataArray.values():
        securityName = securityData.getElementValue("security")
        row = {}
        fieldData = securityData.getElement("fieldData")
        for field in fieldData.elements():
            if not field.isValid():
                logger.info(f"Invalid field: {str(field)}")
            elif field.isArray():
                row[str(field.name())] = field_array_todf(
                    field
                )  # to convert dates to timestamps
            else:
                row[str(field.name())] = get_pythonvalue(field)
        data[securityName] = row
    return data


def process_intraday_tick_msg(msg):
    _, tickdata = msg.getElement("tickData").elements()
    return field_array_todf(tickdata)


def retrieve_data(
    session,
    securities,
    fields=[],
    overrides={},
    start_date=None,
    end_date=None,
    frequency="DAILY",
    options={},
    event_types=["TRADE"],
):
    """
    Convenience function to retrieve data from the Bloomberg API.

    Parameters
    ----------
    session : blpapi session
    securities : list or string
       list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge']
    fields: list or string
       list of fields
    start_date : datetime.date
    end_date : datetime.date
    frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY"
    event_types: list of string
    (only "TRADE", "BID" and "ASK" events seem to be working).
    """
    refDataService = session.getService("//blp/refdata")
    if isinstance(start_date, datetime.datetime):
        request = refDataService.createRequest("IntradayTickRequest")
        for et in event_types:
            request.getElement("eventTypes").appendValue(et)
        request.set("includeConditionCodes", True)
    elif isinstance(start_date, datetime.date):
        request = refDataService.createRequest("HistoricalDataRequest")
        request.set("periodicitySelection", frequency)
    else:
        request = refDataService.createRequest("ReferenceDataRequest")

    if options:
        for k, v in options.items():
            request.set(k, v)

    if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
        if isinstance(securities, list):
            raise TypeError("For intraday requests, we can only handle one security")
        else:
            request.set("security", securities)
    else:
        if hasattr(securities, "__iter__"):
            for security in securities:
                request.append("securities", security)
        else:
            request.append("securities", securities)

    if isinstance(fields, list):
        for field in fields:
            request.append("fields", field)
    else:
        request.append("fields", fields)

    if overrides:
        append_overrides(request, overrides)

    if start_date:
        if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
            start_date = timezone("America/New_York").localize(start_date)
            start_date = start_date.astimezone(timezone("GMT"))
            request.set("startDateTime", start_date)
            if end_date:
                end_date = timezone("America/New_York").localize(end_date)
                end_date = end_date.astimezone(timezone("GMT"))
                request.set("endDateTime", end_date)
        else:
            request.set("startDate", "{:%Y%m%d}".format(start_date))
            if end_date:
                request.set("endDate", "{:%Y%m%d}".format(end_date))

    data = {}
    for msg in event_loop(session, request):
        if msg.hasElement("responseError"):
            logger.error(msg.getElement("responseError").getElementAsString("message"))
            continue
        if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
            data.update(process_reference_msg(msg))
        elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
            data.update(process_historical_msg(msg))
        elif msg.messageType() == blpapi.Name("IntradayTickResponse"):
            data.update(process_intraday_tick_msg(msg))
    if request.asElement().name() == blpapi.Name("IntradayTickRequest"):
        df = pd.DataFrame(data)
        if "time" in df:
            df.time = df.time.dt.tz_convert("America/New_York")
        return df
    else:
        return data


if __name__ == "__main__":
    testdate = pd.datetime(2013, 1, 1)
    hist_securities = ["CADUSD Curncy", "EURUSD Curncy"]
    hist_fields = ["PX_LAST"]
    securities = ["004421BW2 Mtge", "75157EAE2 Mtge", "XS0295516776 Mtge"]
    fields = ["CUR_CPN", "START_ACC_DT"]
    with init_bbg_session(BBG_IP) as session:
        hist_data = retrieve_data(
            session,
            securities,
            hist_fields,
            start_date=testdate.date(),
            frequency="MONTHLY",
        )
        overrides = {"SETTLE_DT": testdate}
        ref_data = retrieve_data(session, securities, fields, overrides=overrides)
        struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])
        spx_ndx_monthly = retrieve_data(
            session,
            ["SPX Index", "NDX Index"],
            fields=["PX_LAST"],
            start_date=datetime.date(2012, 1, 1),
            options={"periodicityAdjustment": "ACTUAL"},
            frequency="MONTHLY",
        )
        trace_data = retrieve_data(
            session,
            "BNCMT 2007-1 A5@TRAC Mtge",
            start_date=datetime.datetime(2018, 5, 18, 9),
            end_date=datetime.datetime(2018, 9, 18, 9),
        )
        today = datetime.datetime.today()
        yesterday = today - datetime.timedelta(days=1)
        tick_data = retrieve_data(
            session, "GOOG US Equity", start_date=yesterday, end_date=today
        )