diff --git a/bnpm/sql_helpers.py b/bnpm/sql_helpers.py index 8002e37..98adda1 100644 --- a/bnpm/sql_helpers.py +++ b/bnpm/sql_helpers.py @@ -4,6 +4,8 @@ import sqlalchemy import pandas as pd +import sqlalchemy +import pandas as pd def make_url( @@ -12,7 +14,7 @@ def make_url( password: str = "password", host: str = "localhost", port: int = None, - database: str = "test", + database: str = "", quote_password: bool = False, ) -> str: """ @@ -33,7 +35,8 @@ def make_url( Port for the database.\n If 'default', then the default port for the SQL type is used.\n database (str): - Name of the database + Name of the database. Leave blank to connect to the server without a + database. quote_password (bool): Use quote_plus to format the passwords with special characters like: !@#$%^&*(){}/[]|:;<>,.?\~`_+-=\n @@ -184,238 +187,239 @@ def make_sql_connection( def get_available_databases( - connection: sqlalchemy.engine.base.Connection, + url: str, + kwargs_connection: Optional[Dict[str, Any]] = {}, ) -> dict: """ Gets a list of all databases available on the server. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection + kwargs_connection (dict): + Additional keyword arguments to be passed to + sqlalchemy.create_engine Returns: (pd.DataFrame): DataFrame of all databases """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - query = "SHOW DATABASES" - elif "postgresql" in str(connection.engine): - query = "SELECT datname FROM pg_database WHERE datistemplate = false" - elif "sqlite" in str(connection.engine): - raise ValueError("SQLite does not support this operation") - else: - raise ValueError("Connection type not recognized") + engine = sqlalchemy.create_engine(url, **kwargs_connection) + if "mysql" in str(engine): + query = "SHOW DATABASES" + elif "postgresql" in str(engine): + query = "SELECT datname FROM pg_database WHERE datistemplate = false" + elif "sqlite" in str(engine): + raise ValueError("SQLite does not support this operation") else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") + raise ValueError("Connection type not recognized") - return pd.read_sql(query, connection) + with engine.begin() as connection: + return pd.read_sql(query, connection) def get_available_tables( - connection: sqlalchemy.engine.base.Connection, + url: str, database: Optional[str] = None, -) -> dict: + kwargs_connection: Optional[Dict[str, Any]] = {}, +) -> pd.DataFrame: """ Gets a list of all tables available in the database. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection database (str): Name of the database. If None, then the default database is used. + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine Returns: (pd.DataFrame): DataFrame of all tables """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - if database is None: - raise ValueError("database must be specified for MySQL") - query = f"SHOW TABLES IN {database}" - elif "postgresql" in str(connection.engine): - if database is None: - query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema'" - else: - query = f"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema' AND tablename NOT LIKE 'pg_%' AND tablename NOT LIKE 'sql_%'" - elif "sqlite" in str(connection.engine): - raise ValueError("SQLite does not support this operation") - else: - raise ValueError("Connection type not recognized") - else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") - - return pd.read_sql(query, connection) - + engine = sqlalchemy.create_engine(url, **kwargs_connection) + return sqlalchemy.inspect(engine).get_table_names(schema=database) def get_table_columns( - connection: sqlalchemy.engine.base.Connection, + url: str, table: str, database: Optional[str] = None, -) -> dict: + kwargs_connection: Optional[Dict[str, Any]] = {}, +) -> pd.DataFrame: """ Gets a list of all columns in the table. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection table (str): Name of the table database (str): Name of the database. If None, then the default database is used. + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine Returns: (pd.DataFrame): DataFrame of all columns """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - if database is None: - raise ValueError("database must be specified for MySQL") - query = f"SHOW COLUMNS FROM {database}.{table}" - elif "postgresql" in str(connection.engine): - if database is None: - query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}'" - else: - query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}' AND table_catalog = '{database}'" - elif "sqlite" in str(connection.engine): - raise ValueError("SQLite does not support this operation") + engine = sqlalchemy.create_engine(url, **kwargs_connection) + if "mysql" in str(engine): + if database is None: + raise ValueError("database must be specified for MySQL") + query = f"SHOW COLUMNS FROM {database}.{table}" + elif "postgresql" in str(engine): + if database is None: + query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}'" else: - raise ValueError("Connection type not recognized") + query = f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table}' AND table_catalog = '{database}'" + elif "sqlite" in str(engine): + raise ValueError("SQLite does not support this operation") else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") - - return pd.read_sql(query, connection) + raise ValueError("Connection type not recognized") + + with engine.begin() as connection: + return pd.read_sql(query, connection) def get_table_data( - connection: sqlalchemy.engine.base.Connection, + url: str, table: str, database: Optional[str] = None, limit: Optional[int] = None, -) -> dict: + kwargs_connection: Optional[Dict[str, Any]] = {}, +) -> pd.DataFrame: """ Gets a list of all data in the table. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection table (str): Name of the table database (str): Name of the database. If None, then the default database is used. limit (int): Number of rows to return. If None, then all rows are returned. + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine Returns: (pd.DataFrame): DataFrame of all data """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - if database is None: - raise ValueError("database must be specified for MySQL") - query = f"SELECT * FROM {database}.{table}" - elif "postgresql" in str(connection.engine): - if database is None: - query = f"SELECT * FROM {table}" - else: - query = f"SELECT * FROM {database}.{table}" - elif "sqlite" in str(connection.engine): - if database is not None: - raise ValueError("database must be None for SQLite") + engine = sqlalchemy.create_engine(url, **kwargs_connection) + if "mysql" in str(engine): + if database is None: + raise ValueError("database must be specified for MySQL") + query = f"SELECT * FROM {database}.{table}" + elif "postgresql" in str(engine): + if database is None: query = f"SELECT * FROM {table}" else: - raise ValueError("Connection type not recognized") + query = f"SELECT * FROM {database}.{table}" + elif "sqlite" in str(engine): + if database is not None: + raise ValueError("database must be None for SQLite") + query = f"SELECT * FROM {table}" else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") + raise ValueError("Connection type not recognized") if limit is not None: query += f" LIMIT {limit}" - - return pd.read_sql(query, connection) + + with engine.begin() as connection: + return pd.read_sql(query, connection) def create_database( - connection: sqlalchemy.engine.base.Connection, + url: str, database: str, + kwargs_connection: Optional[Dict[str, Any]] = {}, ) -> None: """ - Creates a new database on the server. + Creates a new database on the server using a context manager. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection database (str): Name of the database to create + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - query = sqlalchemy.text(f"CREATE DATABASE {database}") - elif "postgresql" in str(connection.engine): - query = sqlalchemy.text(f"CREATE DATABASE {database}") - elif "sqlite" in str(connection.engine): - raise ValueError("SQLite does not support this operation") - else: - raise ValueError("Connection type not recognized") + engine = sqlalchemy.create_engine(url, **kwargs_connection) + if "mysql" in str(engine): + query = f"CREATE DATABASE {database}" + elif "postgresql" in str(engine): + query = f"CREATE DATABASE {database}" + elif "sqlite" in str(engine): + raise ValueError("SQLite does not support this operation") else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") - - connection.execute(query) + raise ValueError("Connection type not recognized") + + with engine.begin() as connection: + connection.execute(sqlalchemy.text(query)) def drop_database( - connection: sqlalchemy.engine.base.Connection, + url: str, database: str, + kwargs_connection: Optional[Dict[str, Any]] = {}, ) -> None: """ Drops a database from the server. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection database (str): Name of the database to drop + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine """ - if isinstance(connection, sqlalchemy.engine.base.Connection): - if "mysql" in str(connection.engine): - query = f"DROP DATABASE {database}" - elif "postgresql" in str(connection.engine): - query = f"DROP DATABASE {database}" - elif "sqlite" in str(connection.engine): - raise ValueError("SQLite does not support this operation") - else: - raise ValueError("Connection type not recognized") + engine = sqlalchemy.create_engine(url, **kwargs_connection) + if "mysql" in str(connection.engine): + query = f"DROP DATABASE {database}" + elif "postgresql" in str(connection.engine): + query = f"DROP DATABASE {database}" + elif "sqlite" in str(connection.engine): + raise ValueError("SQLite does not support this operation") else: - raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") - - connection.execute(sqlalchemy.text(query)) + raise ValueError("Connection type not recognized") + + with engine.begin() as connection: + connection.execute(sqlalchemy.text(query)) def drop_table( - connection: sqlalchemy.engine.base.Connection, + url: str, table: str, database: Optional[str] = None, + kwargs_connection: Optional[Dict[str, Any]] = {}, ) -> None: """ Drops a table from the database. RH 2024 Args: - connection (sqlalchemy.engine.base.Connection): - SQL connection object + url (str): + URL for the database connection table (str): Name of the table to drop database (str): Name of the database. If None, then the default database is used. - """ + kwargs_connection (dict): + Additional keyword arguments to be passed to sqlalchemy.create_engine + """ + engine = sqlalchemy.create_engine(url, **kwargs_connection) if isinstance(connection, sqlalchemy.engine.base.Connection): if "mysql" in str(connection.engine): if database is None: @@ -434,5 +438,6 @@ def drop_table( raise ValueError("Connection type not recognized") else: raise TypeError("connection must be a sqlalchemy.engine.base.Connection object") - - connection.execute(sqlalchemy.text(query)) \ No newline at end of file + + with engine.begin() as connection: + connection.execute(sqlalchemy.text(query))