diff options
Diffstat (limited to 'python/trade_dataclasses.py')
| -rw-r--r-- | python/trade_dataclasses.py | 403 |
1 files changed, 402 insertions, 1 deletions
diff --git a/python/trade_dataclasses.py b/python/trade_dataclasses.py index 4dcbec71..2e602187 100644 --- a/python/trade_dataclasses.py +++ b/python/trade_dataclasses.py @@ -427,6 +427,41 @@ class MTMDeal: return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields}) +from csv_headers.citco import GIL + + +class CitcoDeal: + _citco_queue: ClassVar[list] = [] + _citco_headers = None + _sftp = SftpClient.from_creds("citco") + product_type: str + + def __init_subclass__(cls, deal_type, **kwargs): + super().__init_subclass__(deal_type, **kwargs) + cls._citco_headers = GIL + + @classmethod + def citco_upload(cls): + if not cls._citco_queue: # early exit + return + buf = StringIO() + csvwriter = csv.writer(buf) + csvwriter.writerow(cls._citco_headers) + csvwriter.writerows( + [row.get(h, None) for h in cls._citco_headers] for row in cls._citco_queue + ) + buf = buf.getvalue().encode() + fname = f"i.innocap_serenitas.{datetime.datetime.now():%Y%m%d%H%M%S}.csv" + sftp.client.chdir("incoming") + cls._sftp.put(buf, fname) + dest = DAILY_DIR / str(datetime.date.today()) / fname + dest.write_bytes(buf) + cls._citco_queue.clear() + + def citco_stage(self): + self._citco_queue.append(self.to_citco()) + + @dataclass class CDSDeal( BbgDeal, @@ -1047,7 +1082,6 @@ class TRSDeal( @dataclass class IRSDeal( - # MTMDeal, Deal, deal_type=DealType.IRS, table_name="irs", @@ -1131,3 +1165,370 @@ class IRSDeal( obj["State"] = "Valid" obj.update(d) return obj + + +from enum import IntEnum + + +class TrancheType(IntEnum): + DayCount = 3 + + +@dataclass +class TrancheProduct( + CitcoDeal, + Deal, + deal_type=DealType.TrancheProduct, + table_name="citco_tranche", + insert_ignore=( + "id", + "dealid", + "birth_date", + "death_date", + "security_desc", + "coupon", + "currency", + ), +): + underlying_security_id: str = field(metadata={"citco": "UnderlyingSecurityId"}) + attach: float = field(metadata={"citco": "Attachment_Points"}) + detach: float = field(metadata={"citco": "Detachment_Points"}) + birth_date: datetime.date = field( + init=False, metadata={"insert": False, "citco": "Birth_date"} + ) + death_date: datetime.date = field( + init=False, metadata={"insert": False, "citco": "Death_date"} + ) + coupon: float = field(init=False, metadata={"insert": False, "citco": "CouponRate"}) + security_desc: str = field( + init=False, metadata={"insert": False, "citco": "Sec_Desc"} + ) + currency: str = field(init=False, default=None, metadata={"citco": "LocalCcy"}) + instrument_type: str = field(default="CDS", metadata={"citco": "InstrumentType"}) + underlying_id_source: str = field( + default="RED", metadata={"citco": "UnderlyingIDSource"} + ) + committed: bool = field(default=False) + id: int = field(default=None, metadata={"insert": False}) + dealid: str = field( + default=None, metadata={"insert": False, "citco": "UniqueIdentifier"} + ) + + def get_dealid(self): + sql_str = "SELECT id, dealid, committed from citco_tranche where underlying_security_id = %s and attach = %s and detach = %s" + with self._conn.cursor() as c: + c.execute(sql_str, (self.underlying_security_id, self.attach, self.detach)) + if results := c.fetchone(): + (self.id, self.dealid, self.committed) = results + + def __post_init__(self): + sql_str = "SELECT issue_date, maturity, coupon, index, series FROM index_version LEFT JOIN index_Maturity USING (series, INDEX) WHERE tenor='5yr' AND redindexcode=%s;" + with self._conn.cursor() as c: + c.execute(sql_str, (self.underlying_security_id.removesuffix("_5"),)) + ( + self.birth_date, + self.death_date, + self.coupon, + index, + series, + ) = c.fetchone() + self.get_dealid() + self.security_desc = ( + f"{desc_str(index, series, '5')} {self.attach}-{self.detach}" + ) + self.currency = "EUR" if index in ("XO", "EU") else "USD" + + def to_citco(self): + if not self.id: + self.stage() + self.commit() + self.get_dealid() + obj = self.serialize("citco") + obj["Command"] = "N" + obj["Active"] = "Y" + obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d") + obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d") + obj["CouponRate"] = obj["CouponRate"] / 100 + obj["SettleDays"] = 3 + return obj + + +@dataclass +class SwaptionProduct( + CitcoDeal, + Deal, + deal_type=DealType.SwaptionProduct, + table_name="citco_swaption", + insert_ignore=( + "id", + "dealid", + "security_desc", + "currency", + ), +): + underlying_security_id: str = field(metadata={"citco": "UnderlyingSecurityId"}) + security_desc: str = field( + init=False, metadata={"insert": False, "citco": "Sec_Desc"} + ) + currency: str = field( + init=False, default=None, metadata={"citco": "LocalCcy", "insert": False} + ) + instrument_type: str = field(metadata={"citco": "InstrumentType"}) + callput: bool + strike: float = field(metadata={"citco": "StrikePrice"}) + expiration: datetime.date = field(metadata={"citco": "ExpirationDate"}) + underlying_id_source: str = field( + default="RED", metadata={"citco": "UnderlyingIDSource"} + ) + birth_date: datetime.date = field(default=None, metadata={"citco": "Birth_date"}) + death_date: datetime.date = field(default=None, metadata={"citco": "Death_date"}) + + committed: bool = field(default=False) + id: int = field(default=None, metadata={"insert": False}) + dealid: str = field( + default=None, metadata={"insert": False, "citco": "UniqueIdentifier"} + ) + + def get_dealid(self): + sql_str = "SELECT id, dealid, committed from citco_swaption where instrument_type=%s and underlying_security_id =%s and strike =%s and expiration=%s and callput=%s and birth_date=%s and death_date=%s" + with self._conn.cursor() as c: + c.execute( + sql_str, + ( + self.instrument_type, + self.underlying_security_id, + self.strike, + self.expiration, + self.callput, + self.birth_date, + self.death_date, + ), + ) + if results := c.fetchone(): + (self.id, self.dealid, self.committed) = results + + def __post_init__(self): + self.get_dealid() + if self.underlying_id_source == "RED": + sql_str = "SELECT issue_date, maturity, coupon, index, series FROM index_version LEFT JOIN index_Maturity USING (series, INDEX) WHERE tenor='5yr' AND redindexcode=%s;" + with self._conn.cursor() as c: + c.execute(sql_str, (self.underlying_security_id.removesuffix("_5"),)) + ( + self.birth_date, + self.death_date, + self.coupon, + index, + series, + ) = c.fetchone() + self.security_desc = ( + f"{desc_str(index, series, '5')} {self.expiration}-{self.strike}" + ) + self.currency = "EUR" if index in ("XO", "EU") else "USD" + else: + self.security_desc = "" + + def to_citco(self): + if not self.id: + self.stage() + self.commit() + self.get_dealid() + obj = self.serialize("citco") + if self.underlying_id_source == "USERID": + irs = IRSProduct( + birth_date=self.birth_date, + death_date=self.death_date, + fixed_rate=self.strike, + float_index=self.underlying_security_id, + ) + irs.citco_stage() + obj["UnderlyingSecurityId"] = irs.dealid + obj["Command"] = "N" + obj["Active"] = "Y" + obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d") + obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d") + obj["ExpirationDate"] = obj["ExpirationDate"].strftime("%Y%m%d") + obj["Put/CallFlag"] = "C" if obj["callput"] else "P" + obj["OptionType"] = "Vanilla European" + return obj + + +_citco_frequency = {"Yearly": 1, "Daily": 9, "Quarterly": 3} +_citco_bdc = {"Modified Following": 4} +_citco_daycount = {"ACT/360": 2} +_citco_ratesource = {"SOFRRATE": 17819} + + +@dataclass +class IRSProduct( + CitcoDeal, + Deal, + deal_type=DealType.IRSProduct, + table_name="citco_irs", + insert_ignore=("id", "dealid", "security_desc"), +): + birth_date: datetime.date = field(metadata={"citco": "Birth_date"}) + death_date: datetime.date = field(metadata={"citco": "Death_date"}) + fixed_rate: float + instrument_type: str = field(default="IRS", metadata={"citco": "InstrumentType"}) + active: str = field(default=True, metadata={"citco": "Active"}) + fixed_daycount: str = field(default="ACT/360") + fixed_payment_freq: str = field(default="Yearly") + fixed_bdc: str = field(default="Modified Following") + float_index: str = field(default="SOFRRATE") + float_daycount: str = field(default="ACT/360") + float_payment_freq: str = field(default="Yearly") + float_bdc: str = field(default="Modified Following") + currency: str = field(default="USD", metadata={"citco": "LocalCcy"}) + float_fixing_freq: str = field(default="Daily") + pay_interest_calc_method: str = field(default="Compound") + committed: bool = field(default=False) + security_desc: str = field( + init=False, metadata={"insert": False, "citco": "Sec_Desc"}, default=None + ) + id: int = field(default=None, metadata={"insert": False}) + dealid: str = field( + default=None, metadata={"insert": False, "citco": "UniqueIdentifier"} + ) + + def get_dealid(self): + sql_str = "SELECT id, dealid, committed from citco_irs where birth_date=%s and death_date=%s and float_index=%s and fixed_rate=%s" + with self._conn.cursor() as c: + c.execute( + sql_str, + ( + self.birth_date, + self.death_date, + self.float_index, + self.fixed_rate, + ), + ) + if results := c.fetchone(): + (self.id, self.dealid, self.committed) = results + self.security_desc = f"SWAP IRS {self.float_index}-{self.fixed_rate}" + + def __post_init__(self): + self.get_dealid() + + def to_citco(self): + if not self.id: + self.stage() + self.commit() + self.get_dealid() + + obj = self.serialize("citco") + d = { + f"S_P_CurrencyCode": self.currency, + f"S_P_PaymentFreqID": _citco_frequency[self.fixed_payment_freq], + f"S_P_RateIndexID": 0, + f"S_P_AccrualMethodID": _citco_daycount[self.fixed_daycount], + f"S_P_InterestRate": self.fixed_rate, + f"S_P_DayConventionID": _citco_bdc[self.fixed_bdc], + f"S_P_ResetFreqID": 0, + f"S_R_CurrencyCode": self.currency, + f"S_R_PaymentFreqID": _citco_frequency[self.float_payment_freq], + f"S_R_RateIndexID": 28, + f"S_R_AccrualMethodID": _citco_daycount[self.float_daycount], + f"S_R_InterestRate": 0, + f"S_R_DayConventionID": _citco_bdc[self.float_bdc], + f"S_R_ResetFreqID": _citco_frequency[self.float_fixing_freq], + f"S_R_RateSource": _citco_ratesource[self.float_index], + } + obj.update(d) + obj["Command"] = "N" + obj["Active"] = "Y" if obj["Active"] else "N" + obj["PrincipalExchTypeID"] = 1 + obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d") + obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d") + return obj + + +@dataclass +class TRSProduct( + CitcoDeal, + Deal, + deal_type=DealType.TRSProduct, + table_name="citco_trs", + insert_ignore=("id", "dealid"), +): + birth_date: datetime.date = field(metadata={"citco": "Birth_date"}) + death_date: datetime.date = field(metadata={"citco": "Death_date"}) + underlying_security: str = field(metadata={"citco": "UnderlyingSecurityId"}) + active: str = field(default=True, metadata={"citco": "Active"}) + funding_daycount: str = field(default="ACT/360") + funding_freq: str = field(default="Quarterly") + funding_payment_roll_convention: str = field(default="Modified Following") + asset_daycount: str = field(default="ACT/360") + asset_freq: str = field(default="Quarterly") + asset_payment_roll_convention: str = field(default="Modified Following") + currency: str = field(default="USD", metadata={"citco": "LocalCcy"}) + interest_calc_method: str = field(default="Compound") + compound_avg_frequency: str = field(default="Daily") + fixing_frequency: str = field(default="Daily") + committed: bool = field(default=False) + instrument_type: str = field(default="TRS", metadata={"citco": "InstrumentType"}) + funding_index: str = field(default="SOFRRATE", metadata={}) + security_desc: str = field( + init=False, metadata={"insert": False, "citco": "Sec_Desc"}, default=None + ) + id: int = field(default=None, metadata={"insert": False}) + dealid: str = field( + default=None, metadata={"insert": False, "citco": "UniqueIdentifier"} + ) + + def get_dealid(self): + sql_str = "SELECT id, dealid, committed from citco_trs where birth_date=%s and death_date=%s and funding_index=%s and underlying_security=%s" + with self._conn.cursor() as c: + c.execute( + sql_str, + ( + self.birth_date, + self.death_date, + self.funding_index, + self.underlying_security, + ), + ) + if results := c.fetchone(): + (self.id, self.dealid, self.committed) = results + _citco_trs = {"4J623JAA8": "IBOXHY_TRS"} + self.security_desc = _citco_trs[self.underlying_security] + + def __post_init__(self): + self.get_dealid() + + def to_citco(self): + if not self.id: + + self.stage() + self.commit() + self.get_dealid() + + obj = self.serialize("citco") + d = { + f"S_P_CurrencyCode": self.currency, + f"S_P_PaymentFreqID": _citco_frequency[self.funding_freq], + f"S_P_RateIndexID": 28, + f"S_P_AccrualMethodID": _citco_daycount[self.funding_daycount], + f"S_P_InterestRate": 0, + f"S_P_PaymentCalandarID": 3, + f"S_P_DayConventionID": _citco_bdc[self.funding_payment_roll_convention], + f"S_P_ResetFreqID": _citco_frequency[self.funding_freq], + f"S_P_RateSourceID": _citco_ratesource[self.funding_index], + f"S_R_CurrencyCode": self.currency, + f"S_R_PaymentFreqID": _citco_frequency[self.asset_freq], + f"S_R_RateIndexID": 0, + f"S_R_AccrualMethodID": _citco_daycount[self.asset_daycount], + f"S_R_InterestRate": 0, + f"S_R_PaymentCalandarID": 3, + f"S_R_DayConventionID": _citco_bdc[self.asset_payment_roll_convention], + f"S_R_ResetFreqID": _citco_frequency[self.asset_freq], + f"S_R_RateSourceID": 0, + } + obj.update(d) + obj["Command"] = "N" + obj["Active"] = "Y" if obj["Active"] else "N" + obj["GeneralDirection"] = "F" + obj["PrincipalExchTypeID"] = 3 + obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d") + obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d") + obj["UnderlyingIDSource"] = "RED" + return obj |
