Skip to content

Commit

Permalink
switch sql helpers to use url inputs instead of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
RichieHakim committed Mar 31, 2024
1 parent 4f6ee7c commit f4640ed
Showing 1 changed file with 122 additions and 117 deletions.
239 changes: 122 additions & 117 deletions bnpm/sql_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import sqlalchemy
import pandas as pd
import sqlalchemy
import pandas as pd


def make_url(
Expand All @@ -12,7 +14,7 @@ def make_url(
password: str = "password",
host: str = "localhost",
port: int = None,
database: str = "test",
database: str = "",
quote_password: bool = False,
) -> str:
"""
Expand All @@ -33,7 +35,8 @@ def make_url(
Port for the database.\n
If 'default', then the default port for the SQL type is used.\n
database (str):
Name of the database
Name of the database. Leave blank to connect to the server without a
database.
quote_password (bool):
Use quote_plus to format the passwords with special characters like:
!@#$%^&*(){}/[]|:;<>,.?\~`_+-=\n
Expand Down Expand Up @@ -184,238 +187,239 @@ def make_sql_connection(


def get_available_databases(
connection: sqlalchemy.engine.base.Connection,
url: str,
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> dict:
"""
Gets a list of all databases available on the server.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
kwargs_connection (dict):
Additional keyword arguments to be passed to
sqlalchemy.create_engine
Returns:
(pd.DataFrame):
DataFrame of all databases
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
query = "SHOW DATABASES"
elif "postgresql" in str(connection.engine):
query = "SELECT datname FROM pg_database WHERE datistemplate = false"
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
else:
raise ValueError("Connection type not recognized")
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(engine):
query = "SHOW DATABASES"
elif "postgresql" in str(engine):
query = "SELECT datname FROM pg_database WHERE datistemplate = false"
elif "sqlite" in str(engine):
raise ValueError("SQLite does not support this operation")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")
raise ValueError("Connection type not recognized")

return pd.read_sql(query, connection)
with engine.begin() as connection:
return pd.read_sql(query, connection)


def get_available_tables(
connection: sqlalchemy.engine.base.Connection,
url: str,
database: Optional[str] = None,
) -> dict:
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> pd.DataFrame:
"""
Gets a list of all tables available in the database.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
database (str):
Name of the database. If None, then the default database is used.
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
Returns:
(pd.DataFrame):
DataFrame of all tables
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"SHOW TABLES IN {database}"
elif "postgresql" in str(connection.engine):
if database is None:
query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'"
else:
query = f"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' AND tablename NOT LIKE 'pg_%' AND tablename NOT LIKE 'sql_%'"
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
else:
raise ValueError("Connection type not recognized")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")

return pd.read_sql(query, connection)

engine = sqlalchemy.create_engine(url, **kwargs_connection)
return sqlalchemy.inspect(engine).get_table_names(schema=database)

def get_table_columns(
connection: sqlalchemy.engine.base.Connection,
url: str,
table: str,
database: Optional[str] = None,
) -> dict:
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> pd.DataFrame:
"""
Gets a list of all columns in the table.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
table (str):
Name of the table
database (str):
Name of the database. If None, then the default database is used.
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
Returns:
(pd.DataFrame):
DataFrame of all columns
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"SHOW COLUMNS FROM {database}.{table}"
elif "postgresql" in str(connection.engine):
if database is None:
query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}'"
else:
query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}' AND table_catalog = '{database}'"
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"SHOW COLUMNS FROM {database}.{table}"
elif "postgresql" in str(engine):
if database is None:
query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}'"
else:
raise ValueError("Connection type not recognized")
query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}' AND table_catalog = '{database}'"
elif "sqlite" in str(engine):
raise ValueError("SQLite does not support this operation")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")

return pd.read_sql(query, connection)
raise ValueError("Connection type not recognized")

with engine.begin() as connection:
return pd.read_sql(query, connection)


def get_table_data(
connection: sqlalchemy.engine.base.Connection,
url: str,
table: str,
database: Optional[str] = None,
limit: Optional[int] = None,
) -> dict:
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> pd.DataFrame:
"""
Gets a list of all data in the table.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
table (str):
Name of the table
database (str):
Name of the database. If None, then the default database is used.
limit (int):
Number of rows to return. If None, then all rows are returned.
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
Returns:
(pd.DataFrame):
DataFrame of all data
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"SELECT * FROM {database}.{table}"
elif "postgresql" in str(connection.engine):
if database is None:
query = f"SELECT * FROM {table}"
else:
query = f"SELECT * FROM {database}.{table}"
elif "sqlite" in str(connection.engine):
if database is not None:
raise ValueError("database must be None for SQLite")
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(engine):
if database is None:
raise ValueError("database must be specified for MySQL")
query = f"SELECT * FROM {database}.{table}"
elif "postgresql" in str(engine):
if database is None:
query = f"SELECT * FROM {table}"
else:
raise ValueError("Connection type not recognized")
query = f"SELECT * FROM {database}.{table}"
elif "sqlite" in str(engine):
if database is not None:
raise ValueError("database must be None for SQLite")
query = f"SELECT * FROM {table}"
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")
raise ValueError("Connection type not recognized")

if limit is not None:
query += f" LIMIT {limit}"

return pd.read_sql(query, connection)

with engine.begin() as connection:
return pd.read_sql(query, connection)


def create_database(
connection: sqlalchemy.engine.base.Connection,
url: str,
database: str,
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> None:
"""
Creates a new database on the server.
Creates a new database on the server using a context manager.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
database (str):
Name of the database to create
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
query = sqlalchemy.text(f"CREATE DATABASE {database}")
elif "postgresql" in str(connection.engine):
query = sqlalchemy.text(f"CREATE DATABASE {database}")
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
else:
raise ValueError("Connection type not recognized")
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(engine):
query = f"CREATE DATABASE {database}"
elif "postgresql" in str(engine):
query = f"CREATE DATABASE {database}"
elif "sqlite" in str(engine):
raise ValueError("SQLite does not support this operation")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")

connection.execute(query)
raise ValueError("Connection type not recognized")

with engine.begin() as connection:
connection.execute(sqlalchemy.text(query))


def drop_database(
connection: sqlalchemy.engine.base.Connection,
url: str,
database: str,
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> None:
"""
Drops a database from the server.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
database (str):
Name of the database to drop
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
"""
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
query = f"DROP DATABASE {database}"
elif "postgresql" in str(connection.engine):
query = f"DROP DATABASE {database}"
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
else:
raise ValueError("Connection type not recognized")
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if "mysql" in str(connection.engine):
query = f"DROP DATABASE {database}"
elif "postgresql" in str(connection.engine):
query = f"DROP DATABASE {database}"
elif "sqlite" in str(connection.engine):
raise ValueError("SQLite does not support this operation")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")

connection.execute(sqlalchemy.text(query))
raise ValueError("Connection type not recognized")

with engine.begin() as connection:
connection.execute(sqlalchemy.text(query))


def drop_table(
connection: sqlalchemy.engine.base.Connection,
url: str,
table: str,
database: Optional[str] = None,
kwargs_connection: Optional[Dict[str, Any]] = {},
) -> None:
"""
Drops a table from the database.
RH 2024
Args:
connection (sqlalchemy.engine.base.Connection):
SQL connection object
url (str):
URL for the database connection
table (str):
Name of the table to drop
database (str):
Name of the database. If None, then the default database is used.
"""
kwargs_connection (dict):
Additional keyword arguments to be passed to sqlalchemy.create_engine
"""
engine = sqlalchemy.create_engine(url, **kwargs_connection)
if isinstance(connection, sqlalchemy.engine.base.Connection):
if "mysql" in str(connection.engine):
if database is None:
Expand All @@ -434,5 +438,6 @@ def drop_table(
raise ValueError("Connection type not recognized")
else:
raise TypeError("connection must be a sqlalchemy.engine.base.Connection object")

connection.execute(sqlalchemy.text(query))

with engine.begin() as connection:
connection.execute(sqlalchemy.text(query))

0 comments on commit f4640ed

Please sign in to comment.