1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import pandas as pd
from db import dbengine
import db
import datetime
df = pd.read_table("/home/share/guillaume/murano leads.csv.txt", encoding="utf-16-le", low_memory=False)
## first drop all NA values
df = df.dropna(1, 'all')
## then drop columns with only one values
todrop = []
for col in df:
if df[col].value_counts(dropna=False).size == 1:
todrop.append(col)
df = df.drop(todrop, axis=1)
## drop columns with ID in it
df = df.drop([col for col in df if 'ID' in col], axis=1)
## cleanup url
df.Website.replace("http://","")
df.loc[df.Website.str.startswith('www').fillna(False),'Website'] = 'http://' + \
df.Website[df.Website.str.startswith('www').fillna(False)]
df = df.drop(['Teams', 'Do Not Call', 'Assigned User Name', 'Hit Status'], axis=1)
df['Minimum Track Record'] = pd.to_numeric(df['Minimum Track Record'], errors='coerce')
for col in ['Date Created', 'Date Modified', 'Last Spoke Date']:
df[col] = pd.to_datetime(df[col])
for col in df:
if df[col].dtypes == 'object':
df.loc[df[col].isnull(),col] = None
for col in ['Allocating', 'Allocating to FoHFs']:
df.loc[df[col] == 'Active', col] = True
df.loc[df[col] == 'Inactive', col] = False
df.loc[df[col] == 'Unknown', col] = None
## strip out commas
df.loc[df['Non-primary emails'].notnull(),'Non-primary emails'] = \
df.loc[df['Non-primary emails'].notnull(),'Non-primary emails'].str.extract("([^,]*)", expand=False)
df.Continent = df.Continent.str.title()
for col in ['General Strategy Preferences',
'Specific Strategy Preferences']:
df.loc[df[col].notnull(),col] = (df.loc[df[col].notnull(),col].
str.replace("__SugarMassUpdateClearField__,",""))
for col in ['Specific Strategy Preferences']:
df.loc[df[col].notnull(),col] = (df.loc[df[col].notnull(),col].str.
replace("HF_Corporate_Bonds","HF: Corporate Bonds").str.
replace("HF_Multi_Asset","HF: Multi Asset").str.
replace("OA_Direct_Lending", "OA: Direct Lending").str.
replace("PE_Energy", "PE: Energy").str.
replace("lo_multi_asset", "LO: Multi Asset").str.
replace("PE_Oil_and_Gas", "PE: Oil and Gas"))
for col in ['Investor Type', 'Investment Geography', 'General Strategy Preferences',
'Specific Strategy Preferences', 'Volatility Preference',
'Location Preferences']:
df.loc[df[col].notnull(),col] = df.loc[df[col].notnull(),col].str.split(",")
df['FuM Currency'] = df['FuM Currency'].replace("Unknown", None)
## cleanup country names
col = df['Primary Address Country']
col = col.str.strip().str.title()
col = col.replace(["Republic Of Ireland", "United State Of America", "The Netherlands"],
["Ireland", "United States Of America", "Netherlands"])
def generate_table(df):
print("CREATE TABLE investors(ID serial PRIMARY KEY,")
r = []
for col, t in df.dtypes.iteritems():
if col in ['Allocating', 'Allocating to FoHFs']:
sql_type = 'boolean'
elif col == 'Continent':
sql_type = 'Region'
elif col == 'Investor Type':
sql_type = 'InvestorType[]'
elif col == 'FuM Currency':
sql_type = 'Currency'
elif col in ['Investment Geography', 'Location Preferences']:
sql_type = 'Geography[]'
elif col == 'Mifid':
sql_type = 'Mifid'
elif col == 'Specific Strategy Preferences':
sql_type = 'Strategy_detailed[]'
elif col == 'General Strategy Preferences':
sql_type = 'Strategy[]'
elif col == 'Volatility Preference':
sql_type = 'Volatility_tolerance[]'
elif col == 'Last Spoke Date':
sql_type = 'date'
elif t.name == "object":
sql_type = 'text'
elif t.name == "datetime64[ns]":
sql_type = 'timestamp'
elif t.name == 'float64':
sql_type = 'smallint'
else:
raise ValueError("Something fell through the crack")
r.append('"{}" {}'.format(col, sql_type))
print(",\n".join(r),")")
def list_placeholders(df):
r = ['%s'] # ID
for col in df:
if col == 'Investor Type':
r.append("%s::InvestorType[]")
elif col in ['Investment Geography', 'Location Preferences']:
r.append("%s::Geography[]")
elif col == 'Specific Strategy Preferences':
r.append('%s::Strategy_detailed[]')
elif col == 'General Strategy Preferences':
r.append('%s::Strategy[]')
elif col == 'Volatility Preference':
r.append('%s::Volatility_tolerance[]')
else:
r.append('%s')
return r
from psycopg2.extensions import register_adapter, AsIs
import psycopg2
def nat_to_null(d, _NULL=AsIs('NULL'),
_Timestamp=psycopg2.extensions.TimestampFromPy):
if not type(d) == pd.tslib.NaTType:
return _Timestamp(d)
return _NULL
register_adapter(datetime.datetime, nat_to_null)
investorsdb = dbengine('investorsdb')
sql_str = "INSERT INTO investors VALUES({})".format(",".join(list_placeholders(df)))
conn = investorsdb.raw_connection()
with conn.cursor() as c:
for r in df.itertuples():
try:
c.execute(sql_str, r)
except db.DataError as e:
print(e)
conn.commit()
|