Skip to content

Commit

Permalink
Merge pull request #42 from antoinejeannot/fix-main
Browse files Browse the repository at this point in the history
Fix docstring & concurrency issue with duckdb
  • Loading branch information
MaxHalford authored Jul 29, 2024
2 parents aa4cb0f + 9cee9d2 commit 61f1068
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lea/clients/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def _view_key_to_table_reference(self, view_key: tuple[str], with_context: bool)
'dataset.schema__table'
>>> client._view_key_to_table_reference(("schema", "table"), with_context=True)
'dataset_max.schema__table'
'project.dataset_max.schema__table'
"""
table_reference = f"{self._dataset_name}.{lea._SEP.join(view_key)}"
Expand Down
20 changes: 12 additions & 8 deletions lea/clients/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ def is_motherduck(self):
def prepare(self, views):
schemas = set(view.schema for view in views)
for schema in schemas:
self.con.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
self.con.cursor().sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
console.log(f"Created schema {schema}")

def teardown(self):
os.remove(self.path)

def materialize_sql_view(self, view):
self.con.sql(f"CREATE OR REPLACE TABLE {view.table_reference} AS ({view.query})")
self.con.cursor().sql(f"CREATE OR REPLACE TABLE {view.table_reference} AS ({view.query})")

def materialize_sql_view_incremental(self, view, incremental_field_name):
self.con.sql(
self.con.cursor().sql(
f"""
INSERT INTO {view.table_reference}
SELECT *
Expand All @@ -63,14 +63,18 @@ def materialize_sql_view_incremental(self, view, incremental_field_name):

def materialize_python_view(self, view):
dataframe = self.read_python_view(view) # noqa: F841
self.con.sql(f"CREATE OR REPLACE TABLE {view.table_reference} AS SELECT * FROM dataframe")
self.con.cursor().sql(
f"CREATE OR REPLACE TABLE {view.table_reference} AS SELECT * FROM dataframe"
)

def materialize_json_view(self, view):
dataframe = pd.read_json(view.path) # noqa: F841
self.con.sql(f"CREATE OR REPLACE TABLE {view.table_reference} AS SELECT * FROM dataframe")
self.con.cursor().sql(
f"CREATE OR REPLACE TABLE {view.table_reference} AS SELECT * FROM dataframe"
)

def delete_table_reference(self, table_reference):
self.con.sql(f"DROP TABLE IF EXISTS {table_reference}")
self.con.cursor().sql(f"DROP TABLE IF EXISTS {table_reference}")

def read_sql(self, query: str) -> pd.DataFrame:
return self.con.cursor().sql(query).df()
Expand Down Expand Up @@ -154,8 +158,8 @@ def switch_for_wap_mode(self, view_keys: list[tuple[str]]):
try:
# Concatenate all the statements into one string and execute them
sql = "\n".join(f"{statement};" for statement in statements)
self.con.execute(f"BEGIN TRANSACTION; {sql} COMMIT;")
self.con.cursor().execute(f"BEGIN TRANSACTION; {sql} COMMIT;")
except duckdb.ProgrammingError as e:
# Make sure to rollback if there's an error
self.con.execute("ROLLBACK")
self.con.cursor().execute("ROLLBACK")
raise e

0 comments on commit 61f1068

Please sign in to comment.