import logging from itertools import chain from serenitas.analytics.api import Portfolio, BlackSwaption, CreditIndex from serenitas.utils.db2 import DataError from psycopg import sql logger = logging.getLogger(__name__) def get_swaption_portfolio(date, conn, **kwargs): params = [date, date, date] and_clause = [] for key in ["fund", "portfolio"]: if key in kwargs: params.append(kwargs[key]) and_clause.append(f"AND {key}=%s") with conn.cursor() as c: c.execute( "SELECT security_id AS redcode, maturity," " array_agg(swaptions.id) AS ids," " array_agg(folder::text) AS folders," " array_agg(buysell) AS buysells," " array_agg(option_type::text) AS option_types," " array_agg(notional - COALESCE(terminated_amount, 0.0)) AS notionals," " array_agg(fund::text) AS funds, " " array_agg(expiration_date) AS expiries, " " array_agg(strike) AS strikes " "FROM swaptions LEFT JOIN (" "SELECT dealid, SUM(termination_amount) AS terminated_amount " "FROM terminations WHERE termination_date <= %s GROUP BY dealid) b " "USING (dealid) " "WHERE (terminated_amount IS NULL OR notional > terminated_amount) " "AND expiration_date > %s AND trade_date <= %s " "AND swap_type='CD_INDEX_OPTION' " f"{' '.join(and_clause)} " "GROUP BY security_id, maturity", params, ) trades = [] trade_ids = [] for row in c: index = CreditIndex( redcode=row.redcode, maturity=row.maturity, value_date=date ) index.mark() trades.append( [ BlackSwaption( index, expiry, strike, ot.lower(), "Long" if direction else "Short", notional, tid, ) for expiry, strike, ot, direction, notional, tid in zip( row.expiries, row.strikes, row.option_types, row.buysells, row.notionals, row.ids, ) ] ) trade_ids.append(zip(row.folders, row.ids, row.funds)) portf = Portfolio( list(chain.from_iterable(trades)), list(chain.from_iterable(trade_ids)) ) portf.mark(interp_method="bivariate_linear", mark_index=False, **kwargs) return portf def insert_swaption_portfolio(portf, conn, overwrite=True): columns = ["market_value", "delta", "gamma", "vega", "theta", "hy_equiv"] place_holders = sql.SQL(", ").join([sql.Placeholder()] * 8) if overwrite: update_str = sql.SQL("DO UPDATE SET {}").format( sql.SQL(", ").join( sql.SQL("{} = excluded.{}").format( sql.Identifier(col), sql.Identifier(col) ) for col in columns ) ) else: update_str = sql.SQL("DO NOTHING") sql_str = sql.SQL( "INSERT INTO swaption_marks VALUES({}) ON CONFLICT (dealid, date) {} " ).format(place_holders, update_str) with conn.cursor() as c: for (strat, tid, fund), trade in portf.items(): to_insert = ( f"SWPTN{tid}", trade.value_date, trade.pv, trade.delta, trade.gamma, trade.vega, trade.theta, trade.hy_equiv, ) try: c.execute(sql_str, to_insert) except DataError as e: logger.error(e) finally: logger.info("succesfully marked trade id: %s", id) conn.commit()