Skip to content

Commit

Permalink
Merge pull request #1735 from CartoDB/feature/ch157435/cartoframes-to…
Browse files Browse the repository at this point in the history
…-carto-performance-evaluation

Feature/ch157435/cartoframes to carto performance evaluation
  • Loading branch information
Mmoncadaisla authored Jun 15, 2021
2 parents a55e248 + 32a2272 commit 7fd1094
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 30 deletions.
52 changes: 25 additions & 27 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,33 +98,38 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True,

if self._compare_columns(df_columns, table_columns):
# Equal columns: truncate table
self._truncate_table(table_name, schema, cartodbfy)
self._truncate_table(table_name, schema)
else:
# Diff columns: truncate table and drop + add columns
self._truncate_and_drop_add_columns(
table_name, schema, df_columns, table_columns, cartodbfy)
table_name, schema, df_columns, table_columns)

elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
'if_exists="replace" to overwrite it.'.format(
table_name=table_name, schema=schema))
else: # 'append'
pass
cartodbfy = False
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)
self._create_table_from_columns(table_name, schema, df_columns)

self._copy_from(gdf, table_name, df_columns, retry_times)

if cartodbfy is True:
cartodbfy_query = _cartodbfy_query(table_name, schema)
self.execute_long_running_query(cartodbfy_query)

return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
def create_table_from_query(self, query, table_name, if_exists):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)

if self.has_table(table_name, schema):
if if_exists == 'replace':
# TODO: review logic copy_from
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)
elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
Expand All @@ -133,7 +138,7 @@ def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
else: # 'append'
pass
else:
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)

return table_name

Expand Down Expand Up @@ -291,41 +296,37 @@ def _compare_columns(self, a, b):

return a_copy == b_copy

def _drop_create_table_from_query(self, table_name, schema, query, cartodbfy):
def _drop_create_table_from_query(self, table_name, schema, query):
log.debug('DROP + CREATE table "{}"'.format(table_name))
query = 'BEGIN; {drop}; {create}; {cartodbfy}; COMMIT;'.format(
query = 'BEGIN; {drop}; {create}; COMMIT;'.format(
drop=_drop_table_query(table_name),
create=_create_table_from_query_query(table_name, query),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
create=_create_table_from_query_query(table_name, query))
self.execute_long_running_query(query)

def _create_table_from_columns(self, table_name, schema, columns, cartodbfy):
def _create_table_from_columns(self, table_name, schema, columns):
log.debug('CREATE table "{}"'.format(table_name))
query = 'BEGIN; {create}; {cartodbfy}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {create}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns))
self.execute_query(query)

def _truncate_table(self, table_name, schema, cartodbfy):
def _truncate_table(self, table_name, schema):
log.debug('TRUNCATE table "{}"'.format(table_name))
query = 'BEGIN; {truncate}; {cartodbfy}; COMMIT;'.format(
truncate=_truncate_table_query(table_name),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {truncate}; COMMIT;'.format(
truncate=_truncate_table_query(table_name))
self.execute_query(query)

def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns, cartodbfy):
def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns):
log.debug('TRUNCATE AND DROP + ADD columns table "{}"'.format(table_name))
drop_columns = _drop_columns_query(table_name, table_columns)
add_columns = _add_columns_query(table_name, df_columns)

drop_add_columns = 'ALTER TABLE {table_name} {drop_columns},{add_columns};'.format(
table_name=table_name, drop_columns=drop_columns, add_columns=add_columns)

query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; {cartodbfy}; COMMIT;'.format(
query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; COMMIT;'.format(
regenerate=_regenerate_table_query(table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_columns=drop_add_columns,
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
drop_add_columns=drop_add_columns)

query_length_over_threshold = len(query) > BATCH_API_PAYLOAD_THRESHOLD

Expand All @@ -338,14 +339,11 @@ def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_c
BEGIN;
{truncate};
{drop_add_func_sql};
{cartodbfy};
COMMIT;'''.format(
regenerate=_regenerate_table_query(
table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_func_sql=drop_add_func_sql,
cartodbfy=_cartodbfy_query(
table_name, schema) if cartodbfy else '')
drop_add_func_sql=drop_add_func_sql)
try:
self.execute_long_running_query(query)
finally:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/io/managers/test_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_copy_from(self, mocker):

# Then
mock_create_table.assert_called_once_with('''
BEGIN; CREATE TABLE table_name ("a" bigint); SELECT CDB_CartodbfyTable(\'schema\', \'table_name\'); COMMIT;
BEGIN; CREATE TABLE table_name ("a" bigint); COMMIT;
'''.strip())
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

Expand Down Expand Up @@ -108,7 +108,7 @@ def test_copy_from_exists_replace_truncate_and_drop_add_columns(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', columns, [], True)
mock.assert_called_once_with('table_name', 'schema', columns, [])

def test_copy_from_exists_replace_truncate(self, mocker):
# Given
Expand All @@ -124,7 +124,7 @@ def test_copy_from_exists_replace_truncate(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', True)
mock.assert_called_once_with('table_name', 'schema')

def test_internal_copy_from(self, mocker):
# Given
Expand Down

0 comments on commit 7fd1094

Please sign in to comment.