Skip to content

Commit

Permalink
Merge pull request #136 from CartoDB/iss111-unlimited-data-frame-size
Browse files Browse the repository at this point in the history
Adds basic batching of dataframe uploads
  • Loading branch information
andy-esch authored Jun 20, 2017
2 parents 4213cc8 + 2d2218c commit f6e7fd4
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 31 deletions.
228 changes: 198 additions & 30 deletions cartoframes/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import requests
import IPython
import pandas as pd
from tqdm import tqdm

from carto.auth import APIKeyAuthClient
from carto.sql import SQLClient
Expand All @@ -32,6 +33,9 @@
from urlparse import urlparse
from urllib import urlencode

# Choose constant to avoid overview generation which are triggered at a
# half million rows
MAX_IMPORT_ROWS = 499999

class CartoContext(object):
"""Manages connections with CARTO for data and map operations. Modeled
Expand Down Expand Up @@ -151,21 +155,34 @@ def write(self, df, table_name, temp_dir='/tmp', overwrite=False,
# error if table exists and user does not want to overwrite
self._table_exists(table_name)

# send dataframe to carto, report back tablename
final_table_name = self._send_dataframe(df, table_name, temp_dir,
geom_col, lnglat)
if df.shape[0] > MAX_IMPORT_ROWS:
final_table_name = self._send_batches(df, table_name, temp_dir,
geom_col)
else:
final_table_name = self._send_dataframe(df, table_name, temp_dir,
geom_col)
self._set_schema(df, final_table_name)

# create geometry column from lat/longs if requested
if lnglat:
# TODO: make this a batch job if it is a large dataframe or move
# inside of _send_dataframe and/or batch
tqdm.write('Creating geometry out of columns '
'`{lng}`/`{lat}`'.format(lng=lnglat[0],
lat=lnglat[1]))
self.sql_client.send('''
UPDATE "{table_name}"
SET the_geom = CDB_LatLng({lat}, {lng})
SET the_geom = CDB_LatLng("{lat}"::numeric,
"{lng}"::numeric)
'''.format(table_name=final_table_name,
lng=lnglat[0],
lat=lnglat[1]))

self._column_normalization(df, final_table_name, geom_col)
print('Table written to CARTO: '
'{base_url}dataset/{table_name}'.format(
base_url=self.base_url,
table_name=final_table_name))
tqdm.write('Table written to CARTO: '
'{base_url}dataset/{table_name}'.format(
base_url=self.base_url,
table_name=final_table_name))

def _table_exists(self, table_name):
"""Checks to see if table exists"""
Expand All @@ -184,8 +201,105 @@ def _table_exists(self, table_name):

return False

def _send_dataframe(self, df, table_name, temp_dir, geom_col, lnglat):
"""Send a DataFrame to CARTO to be imported as a SQL table"""
def _send_batches(self, df, table_name, temp_dir, geom_col):
"""Batch sending a dataframe
Args:
df (pandas.DataFrame): DataFrame that will be batched up for
sending to CARTO
table_name (str): Name of table to send DataFrame to
temp_dir (str): Local directory for temporary storage of DataFrame
written to file that will be sent to CARTO
geom_col (str): Name of encoded geometry column (if any) that will
be dropped or converted to `the_geom` column
Returns:
final_table_name (str): Final table name on CARTO that the
DataFrame is stored in
Exceptions:
* TODO: add more (Out of storage)
"""
subtables = []
# send dataframe chunks to carto
for chunk_num, chunk in tqdm(df.groupby([i // MAX_IMPORT_ROWS
for i in range(df.shape[0])]),
desc='Uploading in batches: '):
temp_table = '{orig}_cartoframes_temp_{chunk}'.format(
orig=table_name[:40],
chunk=chunk_num)
try:
# send dataframe chunk, get new name if collision
temp_table = self._send_dataframe(chunk, temp_table,
temp_dir, geom_col)
except CartoException as err:
self._drop_tables(subtables)
raise CartoException(err)

if temp_table:
subtables.append(temp_table)
self._debug_print(chunk_num=chunk_num,
chunk_shape=str(chunk.shape),
temp_table=temp_table)

# combine chunks into final table
try:
select_base = ('SELECT %(schema)s '
'FROM "{table}"') % dict(schema=df2pg_schema(df))
unioned_tables = '\nUNION ALL\n'.join([select_base.format(table=t)
for t in subtables])
self._debug_print(unioned=unioned_tables)
query = '''
DROP TABLE IF EXISTS "{table_name}";
CREATE TABLE "{table_name}" As {unioned_tables};
ALTER TABLE {table_name} DROP COLUMN IF EXISTS cartodb_id;
{drop_tables}
SELECT CDB_CartoDBFYTable('{org}', '{table_name}');
'''.format(table_name=table_name,
unioned_tables=unioned_tables,
org=self.username if self.is_org else 'public',
drop_tables=_drop_tables_query(subtables))
self._debug_print(query=query)
_ = self.sql_client.send(query)
except CartoException as err:
try:
self._drop_tables(subtables)
except CartoException as err:
warn('Failed to drop the following subtables from CARTO '
'account: {}'.format(', '.join(subtables)))
finally:
raise Exception('Failed to upload dataframe: {}'.format(err))

return table_name

def _drop_tables(self, tables):
"""Drop all tables in tables list
Args:
tables (list of str): list of table names
Returns:
None
"""
query = _drop_tables_query(tables)
_ = self.sql_client.send(query)
return None

def _send_dataframe(self, df, table_name, temp_dir, geom_col):
"""Send a DataFrame to CARTO to be imported as a SQL table
Args:
df (pandas.DataFrame): DataFrame that is will be sent to CARTO
table_name (str): Name on CARTO for the table that will have the
data from ``df``
temp_dir (str): Name of directory used for temporarily storing the
DataFrame file to sent to CARTO
geom_col (str): Name of geometry column
Returns:
final_table_name (str): Name of final table. This method will
overwrite the table `table_name` if it already exists.
"""
def remove_tempfile(filepath):
"""removes temporary file"""
os.remove(filepath)
Expand All @@ -196,16 +310,9 @@ def remove_tempfile(filepath):
df.drop(geom_col, axis=1, errors='ignore').to_csv(tempfile)

with open(tempfile, 'rb') as f:
type_guess = str(not any([geom_col, lnglat])).lower()
if type_guess == 'false':
warn('All non-geometry columns in the CARTO version of this '
'DataFrame will be cast to strings. Manually update '
'columns in CARTO if you need to map off of numeric '
'columns. See issue #131 for a proposed solution: '
'https://github.com/CartoDB/cartoframes/issues/131')
res = self._auth_send('api/v1/imports', 'POST',
files={'file': f},
params={'type_guessing': type_guess},
params={'type_guessing': 'false'},
stream=True)
self._debug_print(res=res)

Expand All @@ -215,6 +322,7 @@ def remove_tempfile(filepath):
import_id = res['item_queue_id']

remove_tempfile(tempfile)
final_table_name = table_name
while True:
import_job = self._check_import(import_id)
self._debug_print(import_job=import_job)
Expand All @@ -226,6 +334,29 @@ def remove_tempfile(filepath):

return final_table_name

def _set_schema(self, dataframe, table_name):
"""Update a table associated with a dataframe to have the equivalent
schema"""
utility_cols = ('the_geom', 'the_geom_webmercator', 'cartodb_id')
alter_temp = ('ALTER COLUMN "{col}" TYPE {ctype} USING '
'NULLIF("{col}", \'\')::{ctype}')
alter_cols = ', '.join(alter_temp.format(col=c, ctype=dtypes2pg(t))
for c, t in zip(dataframe.columns,
dataframe.dtypes)
if c not in utility_cols)
alter_query = 'ALTER TABLE "{table}" {alter_cols};'.format(
table=table_name,
alter_cols=alter_cols)
self._debug_print(alter_query=alter_query)
try:
_ = self.sql_client.send(alter_query)
except CartoException as err:
warn('DataFrame written to CARTO but table schema failed to '
'update to match DataFrame. All columns have data type '
'`text`. CARTO error: `{err}`. Query: {query}'.format(
err=err,
query=alter_query))

def _check_import(self, import_id):
"""Check the status of an Import API job"""

Expand All @@ -236,11 +367,21 @@ def _check_import(self, import_id):
def _handle_import(self, import_job, table_name):
"""Handle state of import job"""
if import_job['state'] == 'failure':
raise CartoException('Error code: `{}`. See CARTO Import '
'API error documentation for more '
'information: https://carto.com/docs/'
'carto-engine/import-api/import-errors'
''.format(import_job['error_code']))
if import_job['error_code'] == 8001:
raise CartoException('Over CARTO account storage limit for '
'user `{}`. Try subsetting your '
'DataFrame or dropping columns to reduce '
'the data size.'.format(self.username))
elif import_job['error_code'] == 6668:
raise CartoException('Too many rows in DataFrame. Try '
'subsetting DataFrame before writing to '
'CARTO.')
else:
raise CartoException('Error code: `{}`. See CARTO Import '
'API error documentation for more '
'information: https://carto.com/docs/'
'carto-engine/import-api/import-errors'
''.format(import_job['error_code']))
elif import_job['state'] == 'complete':
self._debug_print(final_table=import_job['table_name'])
if import_job['table_name'] != table_name:
Expand Down Expand Up @@ -276,9 +417,9 @@ def _column_normalization(self, dataframe, table_name, geom_col):
geom_col}
if diff_cols:
cols = ', '.join('`{}`'.format(c) for c in diff_cols)
warn('The following columns were renamed because of PostgreSQL '
'column normalization requirements: {cols}'.format(cols=cols),
stacklevel=2)
tqdm.write('The following columns were renamed because of '
'PostgreSQL column normalization requirements: '
'{cols}'.format(cols=cols))

def sync(self, dataframe, table_name):
"""Depending on the size of the DataFrame or CARTO table, perform
Expand Down Expand Up @@ -330,6 +471,7 @@ def query(self, query, table_name=None, decode_geom=False):

self._debug_print(select_res=select_res)

# TODO: replace this with a function
pg2dtypes = {
'date': 'datetime64[ns]',
'number': 'float64',
Expand Down Expand Up @@ -867,8 +1009,7 @@ def _encode_geom(geom):
from shapely import wkb
if geom:
return ba.hexlify(wkb.dumps(geom)).decode()
else:
return None
return None

@encode_decode_decorator
def _decode_geom(ewkb):
Expand All @@ -877,5 +1018,32 @@ def _decode_geom(ewkb):
from shapely import wkb
if ewkb:
return wkb.loads(ba.unhexlify(ewkb))
else:
return None
return None

def dtypes2pg(dtype):
"""returns equivalent PostgreSQL type for input `dtype`"""
mapping = {'float64': 'numeric',
'int64': 'numeric',
'float32': 'numeric',
'int32': 'numeric',
'object': 'text',
'bool': 'boolean',
'datetime64[ns]': 'text'}
return mapping.get(str(dtype), 'text')

def df2pg_schema(dataframe):
"""Print column names with PostgreSQL schema for the SELECT statement of
a SQL query"""
schema = ', '.join(['NULLIF("{col}", \'\')::{t} AS {col}'.format(col=c,
t=dtypes2pg(t))
for c, t in zip(dataframe.columns, dataframe.dtypes)
if c not in ('the_geom', 'the_geom_webmercator',
'cartodb_id')])
if 'the_geom' in dataframe.columns:
return '"the_geom", ' + schema
return schema

def _drop_tables_query(tables):
"""Generate drop tables query for all tables in list `tables`"""
return '\n'.join('DROP TABLE IF EXISTS {};'.format(t)
for t in tables)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
packages=['cartoframes'],
install_requires=['pandas>=0.20.1',
'webcolors>=1.7.0',
'carto>=1.0.1',],
'carto>=1.0.1',
'tqdm>=4.14.0',],
package_dir={'cartoframes': 'cartoframes'},
package_data={
'': ['LICENSE',
Expand Down

0 comments on commit f6e7fd4

Please sign in to comment.