aboutsummaryrefslogtreecommitdiffstats
path: root/python/trade_dataclasses.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/trade_dataclasses.py')
-rw-r--r--python/trade_dataclasses.py403
1 files changed, 402 insertions, 1 deletions
diff --git a/python/trade_dataclasses.py b/python/trade_dataclasses.py
index 4dcbec71..2e602187 100644
--- a/python/trade_dataclasses.py
+++ b/python/trade_dataclasses.py
@@ -427,6 +427,41 @@ class MTMDeal:
return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields})
+from csv_headers.citco import GIL
+
+
+class CitcoDeal:
+ _citco_queue: ClassVar[list] = []
+ _citco_headers = None
+ _sftp = SftpClient.from_creds("citco")
+ product_type: str
+
+ def __init_subclass__(cls, deal_type, **kwargs):
+ super().__init_subclass__(deal_type, **kwargs)
+ cls._citco_headers = GIL
+
+ @classmethod
+ def citco_upload(cls):
+ if not cls._citco_queue: # early exit
+ return
+ buf = StringIO()
+ csvwriter = csv.writer(buf)
+ csvwriter.writerow(cls._citco_headers)
+ csvwriter.writerows(
+ [row.get(h, None) for h in cls._citco_headers] for row in cls._citco_queue
+ )
+ buf = buf.getvalue().encode()
+ fname = f"i.innocap_serenitas.{datetime.datetime.now():%Y%m%d%H%M%S}.csv"
+ sftp.client.chdir("incoming")
+ cls._sftp.put(buf, fname)
+ dest = DAILY_DIR / str(datetime.date.today()) / fname
+ dest.write_bytes(buf)
+ cls._citco_queue.clear()
+
+ def citco_stage(self):
+ self._citco_queue.append(self.to_citco())
+
+
@dataclass
class CDSDeal(
BbgDeal,
@@ -1047,7 +1082,6 @@ class TRSDeal(
@dataclass
class IRSDeal(
- # MTMDeal,
Deal,
deal_type=DealType.IRS,
table_name="irs",
@@ -1131,3 +1165,370 @@ class IRSDeal(
obj["State"] = "Valid"
obj.update(d)
return obj
+
+
+from enum import IntEnum
+
+
+class TrancheType(IntEnum):
+ DayCount = 3
+
+
+@dataclass
+class TrancheProduct(
+ CitcoDeal,
+ Deal,
+ deal_type=DealType.TrancheProduct,
+ table_name="citco_tranche",
+ insert_ignore=(
+ "id",
+ "dealid",
+ "birth_date",
+ "death_date",
+ "security_desc",
+ "coupon",
+ "currency",
+ ),
+):
+ underlying_security_id: str = field(metadata={"citco": "UnderlyingSecurityId"})
+ attach: float = field(metadata={"citco": "Attachment_Points"})
+ detach: float = field(metadata={"citco": "Detachment_Points"})
+ birth_date: datetime.date = field(
+ init=False, metadata={"insert": False, "citco": "Birth_date"}
+ )
+ death_date: datetime.date = field(
+ init=False, metadata={"insert": False, "citco": "Death_date"}
+ )
+ coupon: float = field(init=False, metadata={"insert": False, "citco": "CouponRate"})
+ security_desc: str = field(
+ init=False, metadata={"insert": False, "citco": "Sec_Desc"}
+ )
+ currency: str = field(init=False, default=None, metadata={"citco": "LocalCcy"})
+ instrument_type: str = field(default="CDS", metadata={"citco": "InstrumentType"})
+ underlying_id_source: str = field(
+ default="RED", metadata={"citco": "UnderlyingIDSource"}
+ )
+ committed: bool = field(default=False)
+ id: int = field(default=None, metadata={"insert": False})
+ dealid: str = field(
+ default=None, metadata={"insert": False, "citco": "UniqueIdentifier"}
+ )
+
+ def get_dealid(self):
+ sql_str = "SELECT id, dealid, committed from citco_tranche where underlying_security_id = %s and attach = %s and detach = %s"
+ with self._conn.cursor() as c:
+ c.execute(sql_str, (self.underlying_security_id, self.attach, self.detach))
+ if results := c.fetchone():
+ (self.id, self.dealid, self.committed) = results
+
+ def __post_init__(self):
+ sql_str = "SELECT issue_date, maturity, coupon, index, series FROM index_version LEFT JOIN index_Maturity USING (series, INDEX) WHERE tenor='5yr' AND redindexcode=%s;"
+ with self._conn.cursor() as c:
+ c.execute(sql_str, (self.underlying_security_id.removesuffix("_5"),))
+ (
+ self.birth_date,
+ self.death_date,
+ self.coupon,
+ index,
+ series,
+ ) = c.fetchone()
+ self.get_dealid()
+ self.security_desc = (
+ f"{desc_str(index, series, '5')} {self.attach}-{self.detach}"
+ )
+ self.currency = "EUR" if index in ("XO", "EU") else "USD"
+
+ def to_citco(self):
+ if not self.id:
+ self.stage()
+ self.commit()
+ self.get_dealid()
+ obj = self.serialize("citco")
+ obj["Command"] = "N"
+ obj["Active"] = "Y"
+ obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d")
+ obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d")
+ obj["CouponRate"] = obj["CouponRate"] / 100
+ obj["SettleDays"] = 3
+ return obj
+
+
+@dataclass
+class SwaptionProduct(
+ CitcoDeal,
+ Deal,
+ deal_type=DealType.SwaptionProduct,
+ table_name="citco_swaption",
+ insert_ignore=(
+ "id",
+ "dealid",
+ "security_desc",
+ "currency",
+ ),
+):
+ underlying_security_id: str = field(metadata={"citco": "UnderlyingSecurityId"})
+ security_desc: str = field(
+ init=False, metadata={"insert": False, "citco": "Sec_Desc"}
+ )
+ currency: str = field(
+ init=False, default=None, metadata={"citco": "LocalCcy", "insert": False}
+ )
+ instrument_type: str = field(metadata={"citco": "InstrumentType"})
+ callput: bool
+ strike: float = field(metadata={"citco": "StrikePrice"})
+ expiration: datetime.date = field(metadata={"citco": "ExpirationDate"})
+ underlying_id_source: str = field(
+ default="RED", metadata={"citco": "UnderlyingIDSource"}
+ )
+ birth_date: datetime.date = field(default=None, metadata={"citco": "Birth_date"})
+ death_date: datetime.date = field(default=None, metadata={"citco": "Death_date"})
+
+ committed: bool = field(default=False)
+ id: int = field(default=None, metadata={"insert": False})
+ dealid: str = field(
+ default=None, metadata={"insert": False, "citco": "UniqueIdentifier"}
+ )
+
+ def get_dealid(self):
+ sql_str = "SELECT id, dealid, committed from citco_swaption where instrument_type=%s and underlying_security_id =%s and strike =%s and expiration=%s and callput=%s and birth_date=%s and death_date=%s"
+ with self._conn.cursor() as c:
+ c.execute(
+ sql_str,
+ (
+ self.instrument_type,
+ self.underlying_security_id,
+ self.strike,
+ self.expiration,
+ self.callput,
+ self.birth_date,
+ self.death_date,
+ ),
+ )
+ if results := c.fetchone():
+ (self.id, self.dealid, self.committed) = results
+
+ def __post_init__(self):
+ self.get_dealid()
+ if self.underlying_id_source == "RED":
+ sql_str = "SELECT issue_date, maturity, coupon, index, series FROM index_version LEFT JOIN index_Maturity USING (series, INDEX) WHERE tenor='5yr' AND redindexcode=%s;"
+ with self._conn.cursor() as c:
+ c.execute(sql_str, (self.underlying_security_id.removesuffix("_5"),))
+ (
+ self.birth_date,
+ self.death_date,
+ self.coupon,
+ index,
+ series,
+ ) = c.fetchone()
+ self.security_desc = (
+ f"{desc_str(index, series, '5')} {self.expiration}-{self.strike}"
+ )
+ self.currency = "EUR" if index in ("XO", "EU") else "USD"
+ else:
+ self.security_desc = ""
+
+ def to_citco(self):
+ if not self.id:
+ self.stage()
+ self.commit()
+ self.get_dealid()
+ obj = self.serialize("citco")
+ if self.underlying_id_source == "USERID":
+ irs = IRSProduct(
+ birth_date=self.birth_date,
+ death_date=self.death_date,
+ fixed_rate=self.strike,
+ float_index=self.underlying_security_id,
+ )
+ irs.citco_stage()
+ obj["UnderlyingSecurityId"] = irs.dealid
+ obj["Command"] = "N"
+ obj["Active"] = "Y"
+ obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d")
+ obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d")
+ obj["ExpirationDate"] = obj["ExpirationDate"].strftime("%Y%m%d")
+ obj["Put/CallFlag"] = "C" if obj["callput"] else "P"
+ obj["OptionType"] = "Vanilla European"
+ return obj
+
+
+_citco_frequency = {"Yearly": 1, "Daily": 9, "Quarterly": 3}
+_citco_bdc = {"Modified Following": 4}
+_citco_daycount = {"ACT/360": 2}
+_citco_ratesource = {"SOFRRATE": 17819}
+
+
+@dataclass
+class IRSProduct(
+ CitcoDeal,
+ Deal,
+ deal_type=DealType.IRSProduct,
+ table_name="citco_irs",
+ insert_ignore=("id", "dealid", "security_desc"),
+):
+ birth_date: datetime.date = field(metadata={"citco": "Birth_date"})
+ death_date: datetime.date = field(metadata={"citco": "Death_date"})
+ fixed_rate: float
+ instrument_type: str = field(default="IRS", metadata={"citco": "InstrumentType"})
+ active: str = field(default=True, metadata={"citco": "Active"})
+ fixed_daycount: str = field(default="ACT/360")
+ fixed_payment_freq: str = field(default="Yearly")
+ fixed_bdc: str = field(default="Modified Following")
+ float_index: str = field(default="SOFRRATE")
+ float_daycount: str = field(default="ACT/360")
+ float_payment_freq: str = field(default="Yearly")
+ float_bdc: str = field(default="Modified Following")
+ currency: str = field(default="USD", metadata={"citco": "LocalCcy"})
+ float_fixing_freq: str = field(default="Daily")
+ pay_interest_calc_method: str = field(default="Compound")
+ committed: bool = field(default=False)
+ security_desc: str = field(
+ init=False, metadata={"insert": False, "citco": "Sec_Desc"}, default=None
+ )
+ id: int = field(default=None, metadata={"insert": False})
+ dealid: str = field(
+ default=None, metadata={"insert": False, "citco": "UniqueIdentifier"}
+ )
+
+ def get_dealid(self):
+ sql_str = "SELECT id, dealid, committed from citco_irs where birth_date=%s and death_date=%s and float_index=%s and fixed_rate=%s"
+ with self._conn.cursor() as c:
+ c.execute(
+ sql_str,
+ (
+ self.birth_date,
+ self.death_date,
+ self.float_index,
+ self.fixed_rate,
+ ),
+ )
+ if results := c.fetchone():
+ (self.id, self.dealid, self.committed) = results
+ self.security_desc = f"SWAP IRS {self.float_index}-{self.fixed_rate}"
+
+ def __post_init__(self):
+ self.get_dealid()
+
+ def to_citco(self):
+ if not self.id:
+ self.stage()
+ self.commit()
+ self.get_dealid()
+
+ obj = self.serialize("citco")
+ d = {
+ f"S_P_CurrencyCode": self.currency,
+ f"S_P_PaymentFreqID": _citco_frequency[self.fixed_payment_freq],
+ f"S_P_RateIndexID": 0,
+ f"S_P_AccrualMethodID": _citco_daycount[self.fixed_daycount],
+ f"S_P_InterestRate": self.fixed_rate,
+ f"S_P_DayConventionID": _citco_bdc[self.fixed_bdc],
+ f"S_P_ResetFreqID": 0,
+ f"S_R_CurrencyCode": self.currency,
+ f"S_R_PaymentFreqID": _citco_frequency[self.float_payment_freq],
+ f"S_R_RateIndexID": 28,
+ f"S_R_AccrualMethodID": _citco_daycount[self.float_daycount],
+ f"S_R_InterestRate": 0,
+ f"S_R_DayConventionID": _citco_bdc[self.float_bdc],
+ f"S_R_ResetFreqID": _citco_frequency[self.float_fixing_freq],
+ f"S_R_RateSource": _citco_ratesource[self.float_index],
+ }
+ obj.update(d)
+ obj["Command"] = "N"
+ obj["Active"] = "Y" if obj["Active"] else "N"
+ obj["PrincipalExchTypeID"] = 1
+ obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d")
+ obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d")
+ return obj
+
+
+@dataclass
+class TRSProduct(
+ CitcoDeal,
+ Deal,
+ deal_type=DealType.TRSProduct,
+ table_name="citco_trs",
+ insert_ignore=("id", "dealid"),
+):
+ birth_date: datetime.date = field(metadata={"citco": "Birth_date"})
+ death_date: datetime.date = field(metadata={"citco": "Death_date"})
+ underlying_security: str = field(metadata={"citco": "UnderlyingSecurityId"})
+ active: str = field(default=True, metadata={"citco": "Active"})
+ funding_daycount: str = field(default="ACT/360")
+ funding_freq: str = field(default="Quarterly")
+ funding_payment_roll_convention: str = field(default="Modified Following")
+ asset_daycount: str = field(default="ACT/360")
+ asset_freq: str = field(default="Quarterly")
+ asset_payment_roll_convention: str = field(default="Modified Following")
+ currency: str = field(default="USD", metadata={"citco": "LocalCcy"})
+ interest_calc_method: str = field(default="Compound")
+ compound_avg_frequency: str = field(default="Daily")
+ fixing_frequency: str = field(default="Daily")
+ committed: bool = field(default=False)
+ instrument_type: str = field(default="TRS", metadata={"citco": "InstrumentType"})
+ funding_index: str = field(default="SOFRRATE", metadata={})
+ security_desc: str = field(
+ init=False, metadata={"insert": False, "citco": "Sec_Desc"}, default=None
+ )
+ id: int = field(default=None, metadata={"insert": False})
+ dealid: str = field(
+ default=None, metadata={"insert": False, "citco": "UniqueIdentifier"}
+ )
+
+ def get_dealid(self):
+ sql_str = "SELECT id, dealid, committed from citco_trs where birth_date=%s and death_date=%s and funding_index=%s and underlying_security=%s"
+ with self._conn.cursor() as c:
+ c.execute(
+ sql_str,
+ (
+ self.birth_date,
+ self.death_date,
+ self.funding_index,
+ self.underlying_security,
+ ),
+ )
+ if results := c.fetchone():
+ (self.id, self.dealid, self.committed) = results
+ _citco_trs = {"4J623JAA8": "IBOXHY_TRS"}
+ self.security_desc = _citco_trs[self.underlying_security]
+
+ def __post_init__(self):
+ self.get_dealid()
+
+ def to_citco(self):
+ if not self.id:
+
+ self.stage()
+ self.commit()
+ self.get_dealid()
+
+ obj = self.serialize("citco")
+ d = {
+ f"S_P_CurrencyCode": self.currency,
+ f"S_P_PaymentFreqID": _citco_frequency[self.funding_freq],
+ f"S_P_RateIndexID": 28,
+ f"S_P_AccrualMethodID": _citco_daycount[self.funding_daycount],
+ f"S_P_InterestRate": 0,
+ f"S_P_PaymentCalandarID": 3,
+ f"S_P_DayConventionID": _citco_bdc[self.funding_payment_roll_convention],
+ f"S_P_ResetFreqID": _citco_frequency[self.funding_freq],
+ f"S_P_RateSourceID": _citco_ratesource[self.funding_index],
+ f"S_R_CurrencyCode": self.currency,
+ f"S_R_PaymentFreqID": _citco_frequency[self.asset_freq],
+ f"S_R_RateIndexID": 0,
+ f"S_R_AccrualMethodID": _citco_daycount[self.asset_daycount],
+ f"S_R_InterestRate": 0,
+ f"S_R_PaymentCalandarID": 3,
+ f"S_R_DayConventionID": _citco_bdc[self.asset_payment_roll_convention],
+ f"S_R_ResetFreqID": _citco_frequency[self.asset_freq],
+ f"S_R_RateSourceID": 0,
+ }
+ obj.update(d)
+ obj["Command"] = "N"
+ obj["Active"] = "Y" if obj["Active"] else "N"
+ obj["GeneralDirection"] = "F"
+ obj["PrincipalExchTypeID"] = 3
+ obj["Birth_date"] = obj["Birth_date"].strftime("%Y%m%d")
+ obj["Death_date"] = obj["Death_date"].strftime("%Y%m%d")
+ obj["UnderlyingIDSource"] = "RED"
+ return obj