aboutsummaryrefslogtreecommitdiffstats
path: root/python/utils/db.py
blob: fcc3ab0de4262e47b325f53c59550feb2469bca9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import datetime
import os
import psycopg2
from psycopg2.extras import DictCursor, NamedTupleCursor
from psycopg2 import IntegrityError, DataError
from psycopg2.extensions import DateFromPy, register_adapter, AsIs
from psycopg2.pool import ThreadedConnectionPool
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
import numpy as np
import atexit


class InfDateAdapter:
    def __init__(self, wrapped):
        self.wrapped = wrapped

    def getquoted(self):
        if self.wrapped == datetime.date.max:
            return b"'infinity'::date"
        elif self.wrapped == datetime.date.min:
            return b"'-infinity'::date"
        else:
            return psycopg2.extensions.DateFromPy(self.wrapped).getquoted()


def nan_to_null(f, _NULL=AsIs("NULL"), _Float=psycopg2.extensions.Float):
    if not np.isnan(f):
        return _Float(f)
    return _NULL


register_adapter(datetime.date, InfDateAdapter)
register_adapter(np.int64, lambda x: AsIs(x))
register_adapter(np.float, nan_to_null)


def dbconn(dbname, cursor_factory=NamedTupleCursor):
    if dbname == "etdb":
        dbname = "ET"
        user_name = "et_user"
    else:
        user_name = dbname[:-2] + "_user"
    return psycopg2.connect(
        database=dbname,
        user=user_name,
        host=os.environ.get("PGHOST", "debian"),
        cursor_factory=cursor_factory,
        options="-c extra_float_digits=3",
    )


def dbengine(dbname, cursor_factory=NamedTupleCursor):
    if dbname in ["rmbs_model", "corelogic", "crt"]:
        uri = URL(
            drivername="mysql+mysqlconnector",
            host="debian",
            database=dbname,
            query={"option_files": os.path.expanduser("~/.my.cnf")},
        )
        return create_engine(uri, paramstyle="format")
    else:
        if dbname == "etdb":
            dbname = "ET"
            user_name = "et_user"
        else:
            user_name = dbname[:-2] + "_user"
        uri = URL(
            drivername="postgresql",
            host=os.environ.get("PGHOST", "debian"),
            username=user_name,
            database=dbname,
            query={"options": "-c extra_float_digits=3"},
        )
        return create_engine(
            uri, paramstyle="format", connect_args={"cursor_factory": cursor_factory}
        )


def with_connection(dbname):
    def decorator(f):
        conn = dbconn(dbname)

        def with_connection_(*args, **kwargs):
            # or use a pool, or a factory function...
            try:
                rv = f(conn, *args, **kwargs)
            except Exception as e:
                print(e)
                conn.rollback()
            else:
                return rv

        return with_connection_

    return decorator


def query_db(conn, sqlstr, params=None, one=True):
    with conn.cursor() as c:
        if params:
            c.execute(sqlstr, params)
        else:
            c.execute(sqlstr)
        conn.commit()
        r = c.fetchone() if one else c.fetchall()
    return r


serenitas_pool = ThreadedConnectionPool(
    0,
    5,
    database="serenitasdb",
    user="serenitas_user",
    host=os.environ.get("PGHOST", "debian"),
    cursor_factory=DictCursor,
)


@atexit.register
def close_db():
    serenitas_pool.closeall()


serenitas_engine = dbengine("serenitasdb")
dawn_engine = dbengine("dawndb")