1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
###### Citco Submission DataClass
from dataclasses import dataclass, field, fields, Field
from serenitas.ops.trade_dataclasses import Deal
from typing import Literal
import datetime
import csv
import datetime
from serenitas.utils.exchange import ExchangeMessage
import logging
from psycopg.errors import UniqueViolation
logger = logging.getLogger(__name__)
def get_file_status(s):
is_processed, fname_short = s.rsplit("_", 1)
is_processed = is_processed.rsplit("-")[1] == "PROCESSED"
fname_short = fname_short.removesuffix(".csv")
return is_processed, fname_short
def get_success_data(line):
if line[2]: # This is a trade
identifier_type = "trade"
serenitas_id = line[5]
identifier = line[2]
else:
identifier_type = "instrument"
serenitas_id = line[6]
identifier = line[1]
return identifier_type, serenitas_id, identifier
def get_failed_data(line):
if len(line) == 1:
return ("failed", line[-1])
elif line[1]: # Trade upload
return ("trade", line[2])
elif (
not line[1] and line[2]
): # Instrument upload, just mark as failed if it's a single error message
return ("instrument", line[2])
else:
return ("failed", line[-1])
def instrument_table(instrument_id):
if instrument_id.startswith("IRS"):
return "citco_irs"
elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
return "citco_swaption"
elif instrument_id.startswith("CDS_"):
return "citco_tranche"
elif instrument_id.startswith("TRS"):
return "citco_trs"
@dataclass
class CitcoSubmission(Deal, deal_type=None, table_name="citco_submission"):
fname: str = field()
identifier_type: Literal["trade", "instrument"]
identifier: str
serenitas_id: str
submit_date: datetime.datetime = field(default=datetime.datetime.now())
@classmethod
def from_citco_line(cls, line, fname):
is_processed, fname_short = get_file_status(fname)
if is_processed:
identifier_type, serenitas_id, identifier = get_success_data(line)
else:
serenitas_id = "failed"
(
identifier_type,
identifier,
) = get_failed_data(line)
return cls(
fname=fname_short,
identifier_type=identifier_type,
identifier=identifier,
serenitas_id=serenitas_id,
)
@classmethod
def process(cls, fh, fname):
next(fh) # skip header
for row in csv.reader(fh):
trade = cls.from_citco_line(row, fname)
trade.stage()
@classmethod
def update_citco_tables(cls):
with cls._conn.cursor() as c:
for row in cls._insert_queue:
if row[1] == "instrument":
serenitas_id = row[2]
c.execute(
f"UPDATE {instrument_table(serenitas_id)} SET committed=True WHERE dealid=%s",
(serenitas_id,),
)
@classmethod
def commit(cls):
if not cls._insert_queue:
return
with cls._conn.cursor() as c:
try:
c.executemany(cls._sql_insert, cls._insert_queue)
except UniqueViolation as e:
logger.warning(e)
cls._conn.rollback()
else:
cls._conn.commit()
cls.update_citco_tables()
em = ExchangeMessage()
em.send_email(
f"(CITCO) UPLOAD {'SUCCESS' if cls._insert_queue[0][3] != 'failed' else 'FAILED'}",
"\n".join(map(str, cls._insert_queue)),
("selene-ops@lmcg.com",),
)
finally:
cls._insert_queue.clear()
|