1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
import logging
from itertools import chain
from serenitas.analytics.api import Portfolio, BlackSwaption, CreditIndex
from serenitas.utils.db2 import DataError
from psycopg import sql
logger = logging.getLogger(__name__)
def get_swaption_portfolio(date, conn, **kwargs):
params = [date, date, date]
and_clause = []
for key in ["fund", "portfolio"]:
if key in kwargs:
params.append(kwargs[key])
and_clause.append(f"AND {key}=%s")
with conn.cursor() as c:
c.execute(
"SELECT security_id AS redcode, maturity,"
" array_agg(swaptions.id) AS ids,"
" array_agg(folder::text) AS folders,"
" array_agg(buysell) AS buysells,"
" array_agg(option_type::text) AS option_types,"
" array_agg(notional - COALESCE(terminated_amount, 0.0)) AS notionals,"
" array_agg(fund::text) AS funds, "
" array_agg(expiration_date) AS expiries, "
" array_agg(strike) AS strikes "
"FROM swaptions LEFT JOIN ("
"SELECT dealid, SUM(termination_amount) AS terminated_amount "
"FROM terminations WHERE termination_date <= %s GROUP BY dealid) b "
"USING (dealid) "
"WHERE (terminated_amount IS NULL OR notional > terminated_amount) "
"AND expiration_date > %s AND trade_date <= %s "
"AND swap_type='CD_INDEX_OPTION' "
f"{' '.join(and_clause)} "
"GROUP BY security_id, maturity",
params,
)
trades = []
trade_ids = []
for row in c:
index = CreditIndex(
redcode=row.redcode, maturity=row.maturity, value_date=date
)
index.mark()
trades.append(
[
BlackSwaption(
index,
expiry,
strike,
ot.lower(),
"Long" if direction else "Short",
notional,
tid,
)
for expiry, strike, ot, direction, notional, tid in zip(
row.expiries,
row.strikes,
row.option_types,
row.buysells,
row.notionals,
row.ids,
)
]
)
trade_ids.append(zip(row.folders, row.ids, row.funds))
portf = Portfolio(
list(chain.from_iterable(trades)), list(chain.from_iterable(trade_ids))
)
portf.mark(interp_method="bivariate_linear", mark_index=False, **kwargs)
return portf
def insert_swaption_portfolio(portf, conn, overwrite=True):
columns = ["market_value", "delta", "gamma", "vega", "theta", "hy_equiv"]
place_holders = sql.SQL(", ").join([sql.Placeholder()] * 8)
if overwrite:
update_str = sql.SQL("DO UPDATE SET {}").format(
sql.SQL(", ").join(
sql.SQL("{} = excluded.{}").format(
sql.Identifier(col), sql.Identifier(col)
)
for col in columns
)
)
else:
update_str = sql.SQL("DO NOTHING")
sql_str = sql.SQL(
"INSERT INTO swaption_marks VALUES({}) ON CONFLICT (dealid, date) {} "
).format(place_holders, update_str)
with conn.cursor() as c:
for (strat, tid, fund), trade in portf.items():
to_insert = (
f"SWPTN{tid}",
trade.value_date,
trade.pv,
trade.delta,
trade.gamma,
trade.vega,
trade.theta,
trade.hy_equiv,
)
try:
c.execute(sql_str, to_insert)
except DataError as e:
logger.error(e)
finally:
logger.info("succesfully marked trade id: %s", id)
conn.commit()
|