aboutsummaryrefslogtreecommitdiffstats
path: root/python/ops
diff options
context:
space:
mode:
Diffstat (limited to 'python/ops')
-rw-r--r--python/ops/file_gen.py1
-rw-r--r--python/ops/funds.py23
-rw-r--r--python/ops/headers.py8
-rw-r--r--python/ops/process_queue.py36
-rw-r--r--python/ops/trade_dataclasses.py58
5 files changed, 50 insertions, 76 deletions
diff --git a/python/ops/file_gen.py b/python/ops/file_gen.py
index 1986ba0d..30fc509b 100644
--- a/python/ops/file_gen.py
+++ b/python/ops/file_gen.py
@@ -4,7 +4,6 @@ from serenitas.utils.misc import rename_keys
from pyisda.date import previous_twentieth
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
from serenitas.analytics.dates import bus_day
-from .headers import get_headers
def get_effective_date(d, swaption_type):
diff --git a/python/ops/funds.py b/python/ops/funds.py
index 101deea2..bf8ab290 100644
--- a/python/ops/funds.py
+++ b/python/ops/funds.py
@@ -8,8 +8,9 @@ from io import StringIO
from pickle import dumps
from typing import Tuple, Union
from serenitas.utils.env import DAILY_DIR
-from .file_gen import get_headers, build_line
+from .file_gen import build_line
from .trade_dataclasses import DealKind
+from .headers import get_headers, MTM_HEADERS
class Fund:
@@ -66,6 +67,7 @@ class Fund:
"iam": "IamDeal",
"trs": "TRSDeal",
"irs": "IRSDeal",
+ "mtm": cls.product_type.capitalize(),
}
trade_tag: str
if isinstance(trade_type, tuple):
@@ -143,3 +145,22 @@ class Selene(Fund, fund_name="ISOSEL"):
cls.staging_queue.append(obj.to_citco(trade["action"]))
else:
redis_pipeline.rpush("product_queue", dumps((trade_type, trade)))
+
+
+class MTM(Fund, fund_name="MTM"):
+ filepath_pattern = "MTM.{timestamp:%Y%m%d.%H%M%S}.{trade_tag}.csv"
+ product_type: str
+
+ @classmethod
+ def set_headers(cls, trade_type):
+ cls.headers = MTM_HEADERS[trade_type]
+
+ @classmethod
+ def stage(cls, trade, *, trade_type, **kwargs):
+ obj = DealKind[trade_type].from_dict(**trade)
+ cls.staging_queue.append(obj.to_markit())
+
+ @staticmethod
+ def upload(buf, dest):
+ sftp = SftpClient.from_creds("mtm")
+ sftp.put(buf, dest)
diff --git a/python/ops/headers.py b/python/ops/headers.py
index d3582575..5b1d54b6 100644
--- a/python/ops/headers.py
+++ b/python/ops/headers.py
@@ -719,7 +719,7 @@ HEADERS = {
}
MTM_HEADERS = {
- DealType.CDS: [
+ "cds": [
"Swap ID",
"Allocation ID",
"Description",
@@ -789,7 +789,7 @@ MTM_HEADERS = {
"Alternate Trade ID",
"Definitions Type",
],
- DealType.Swaption: [
+ "swaption": [
"Swap ID",
"Broker Id",
"Trade ID",
@@ -822,7 +822,7 @@ MTM_HEADERS = {
"Swaption Quotation Rate Type",
"Effective Date",
],
- DealType.Termination: [
+ "termination": [
"Swap ID",
"Allocation ID",
"Description",
@@ -851,7 +851,7 @@ MTM_HEADERS = {
"Remaining Party",
"DTCC Remaining CounterParty ID",
],
- DealType.TRS: [
+ "trs": [
"Swap ID",
"Allocation ID",
"Description ",
diff --git a/python/ops/process_queue.py b/python/ops/process_queue.py
index 0dbb9e8b..17c81892 100644
--- a/python/ops/process_queue.py
+++ b/python/ops/process_queue.py
@@ -54,6 +54,7 @@ def process_indicative(
process_fun = globals().get(
f"{trade_type}_trade_process", lambda conn, session, trade: trade
)
+ mtm = Fund["MTM"]()
for trade in get_trades(p, trade_type):
process_fun(conn, session, trade)
fund = trade["fund"]
@@ -66,9 +67,9 @@ def process_indicative(
"CD_INDEX_TRANCHE",
"BESPOKE",
):
- DealKind[trade_type].from_dict(**trade).mtm_stage()
- if Deal := DealKind[trade_type]:
- Deal.mtm_upload()
+ mtm.stage(trade, trade_type=trade_type)
+ buf, dest = mtm.build_buffer(trade_type)
+ mtm.upload(buf, dest.name)
p.delete(trade_type)
@@ -81,7 +82,7 @@ def process_upload(
for fund_name, l in groupby(p, key, "fund").items():
fund = Fund[fund_name]()
for trade in l:
- fund.stage(trade, trade_type=trade_type)
+ fund.stage(trade, trade_type=trade_type, redis_pipeline=p)
buf, dest = fund.build_buffer(trade_type)
if upload:
fund.upload(buf, dest.name)
@@ -96,22 +97,19 @@ def terminate_list(
base_dir: pathlib.Path = DAILY_DIR,
):
trade_type, fund, _ = key.split("_")
- terms = []
+ mtm = Fund["MTM"]()
+ f = Fund[fund]()
for term in p.lrange(key, 0, -1):
- termination = loads(term)
- DealKind["termination"].from_dict(**termination).mtm_stage()
- try:
- terms.append(termination.to_globeop())
- except TypeError as e:
- logging.error(e)
- return
- DealKind["termination"].mtm_upload()
- if upload and terms:
- f = Fund[fund]()
- f.staging_queue = terms
- dest = f.get_filepath(base_dir, (trade_type, "A"))
- buf = f.build_buffer("termination")
- f.upload(buf, dest)
+ obj = DealKind["termination"].from_dict(**loads(term))
+ mtm.staging_queue.append(obj.to_markit())
+ f.staging_queue.append(obj.to_globeop())
+ # pretty crappy way, but there should be only one
+ mtm.product_type = obj.product_type
+ buf, dest = mtm.build_buffer("mtm")
+ mtm.upload(buf, dest.name)
+ if upload and f.staging_queue:
+ buf, dest = f.build_buffer((trade_type, "A"))
+ f.upload(buf, dest.name)
p.delete(key)
diff --git a/python/ops/trade_dataclasses.py b/python/ops/trade_dataclasses.py
index 07ac69be..a391b41b 100644
--- a/python/ops/trade_dataclasses.py
+++ b/python/ops/trade_dataclasses.py
@@ -1,8 +1,7 @@
from dataclasses import dataclass, field, fields, Field
from enum import Enum
from io import StringIO
-from .headers import DealType, MTM_HEADERS, HEADERS
-from csv_headers.citco import GIL, GTL
+from .headers import DealType
from typing import ClassVar, Tuple, Union
from decimal import Decimal
from typing import Literal
@@ -338,6 +337,10 @@ class Deal:
f.metadata.get(tag, f.name): getattr(self, f.name) for f in fields(self)
}
+ @classmethod
+ def from_dict(cls, **kwargs):
+ return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields})
+
class BbgDeal:
_bbg_insert_queue: ClassVar[list] = []
@@ -396,49 +399,6 @@ class BbgDeal:
return cp_code
-class MTMDeal:
- _mtm_queue: ClassVar[list] = []
- _mtm_headers = None
- _mtm_sftp = SftpClient.from_creds("mtm")
- product_type: str
-
- def __init_subclass__(cls, deal_type, **kwargs):
- super().__init_subclass__(deal_type, **kwargs)
- cls._mtm_headers = MTM_HEADERS[deal_type]
- if deal_type == DealType.Swaption:
- cls.product_type = "CDISW"
- elif deal_type == DealType.CDS:
- cls.product_type = "TRN"
- elif deal_type == DealType.Termination:
- cls.product_type = "TERM"
- elif deal_type == DealType.TRS:
- cls.product_type = "CDI"
-
- @classmethod
- def mtm_upload(cls):
- if not cls._mtm_queue: # early exit
- return
- buf = StringIO()
- csvwriter = csv.writer(buf)
- csvwriter.writerow(cls._mtm_headers)
- csvwriter.writerows(
- [row.get(h, None) for h in cls._mtm_headers] for row in cls._mtm_queue
- )
- buf = buf.getvalue().encode()
- fname = f"MTM.{datetime.datetime.now():%Y%m%d.%H%M%S}.{cls.product_type.capitalize()}.csv"
- cls._mtm_sftp.put(buf, fname)
- dest = DAILY_DIR / str(datetime.date.today()) / fname
- dest.write_bytes(buf)
- cls._mtm_queue.clear()
-
- def mtm_stage(self):
- self._mtm_queue.append(self.to_markit())
-
- @classmethod
- def from_dict(cls, **kwargs):
- return cls(**{k: v for k, v in kwargs.items() if k in cls._sql_fields})
-
-
class Citco:
pass
@@ -452,7 +412,7 @@ class CitcoProduct(Citco):
def get_productid(self):
filter_clause = " AND ".join([f"{k}=%s" for k in self.product_key])
sql_str = (
- f"SELECT id, deal id, status FROM {self._table_name} WHERE {filter_clause}"
+ f"SELECT id, dealid, status FROM {self._table_name} WHERE {filter_clause}"
)
with self._conn.cursor() as c:
c.execute(
@@ -509,7 +469,6 @@ class CitcoTrade(Citco):
class CDSDeal(
CitcoTrade,
BbgDeal,
- MTMDeal,
Deal,
deal_type=DealType.CDS,
table_name="cds",
@@ -797,7 +756,6 @@ class BondDeal(
@dataclass
class SwaptionDeal(
CitcoTrade,
- MTMDeal,
Deal,
deal_type=DealType.Swaption,
table_name="swaptions",
@@ -860,7 +818,7 @@ class SwaptionDeal(
round(obj["price"] * obj["1st Leg Notional"] * 0.01, 2) * self.factor
)
obj["Trade ID"] = obj["Swap ID"]
- obj["Product Type"] = self.product_type
+ obj["Product Type"] = "CDISW"
obj["Transaction Type"] = "NEW"
if obj["buysell"]:
obj["Transaction Code"] = "Pay"
@@ -909,7 +867,6 @@ class SwaptionDeal(
@dataclass
class TerminationDeal(
- MTMDeal,
Deal,
deal_type=DealType.Termination,
table_name="terminations",
@@ -1246,7 +1203,6 @@ class FxSwapDeal(
@dataclass
class TRSDeal(
CitcoTrade,
- MTMDeal,
Deal,
deal_type=DealType.TRS,
table_name="trs",