Skip to content

Commit

Permalink
wip on ibis, #99
Browse files Browse the repository at this point in the history
  • Loading branch information
aersam committed Jul 7, 2023
1 parent 5839a16 commit f2c72fd
Show file tree
Hide file tree
Showing 8 changed files with 645 additions and 29 deletions.
16 changes: 8 additions & 8 deletions bmsdna/lakeapi/context/df_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
import polars as pl
from bmsdna.lakeapi.core.config import SearchConfig
import pypika.terms

from bmsdna.lakeapi.query import QueryBuilder
from ibis.backends.base import BaseBackend

if TYPE_CHECKING:
import pandas as pd


def get_sql(sql_or_pypika: str | pypika.queries.QueryBuilder, limit_zero=False) -> str:
def get_sql(backend: BaseBackend, sql_or_pypika: str | QueryBuilder, limit_zero=False) -> str:
if limit_zero:
sql_or_pypika = (
sql_or_pypika.limit(0)
Expand All @@ -24,9 +25,7 @@ def get_sql(sql_or_pypika: str | pypika.queries.QueryBuilder, limit_zero=False)
)
if isinstance(sql_or_pypika, str):
return sql_or_pypika
if len(sql_or_pypika._selects) == 0:
return sql_or_pypika.select("*").get_sql()
return sql_or_pypika.get_sql()
return backend.compile(sql_or_pypika)


class ResultData(ABC):
Expand All @@ -51,7 +50,7 @@ def to_arrow_recordbatch(self, chunk_size: int = 10000) -> pa.RecordBatchReader:
...

@abstractmethod
def query_builder(self) -> pypika.queries.QueryBuilder:
def query_builder(self) -> QueryBuilder:
...

def write_json(self, file_name: str):
Expand Down Expand Up @@ -106,10 +105,11 @@ def write_csv(self, file_name: str, *, separator: str):


class ExecutionContext(ABC):
def __init__(self, chunk_size: int) -> None:
def __init__(self, chunk_size: int, backend: BaseBackend) -> None:
super().__init__()
self.modified_dates: dict[str, datetime] = {}
self.chunk_size = chunk_size
self.backend = backend

@abstractmethod
def __enter__(self) -> "ExecutionContext":
Expand Down Expand Up @@ -222,7 +222,7 @@ def register_datasource(
self.register_arrow(name, ds)

@abstractmethod
def execute_sql(self, sql: Union[pypika.queries.QueryBuilder, str]) -> ResultData:
def execute_sql(self, sql: Union[QueryBuilder, str]) -> ResultData:
...

@abstractmethod
Expand Down
14 changes: 5 additions & 9 deletions bmsdna/lakeapi/context/df_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
from bmsdna.lakeapi.context.df_base import ExecutionContext, ResultData, get_sql
import duckdb
import pyarrow.dataset
import pypika.queries
import pypika.terms
import pypika.functions
import pypika.enums
import pypika
from ibis.backends.duckdb import Backend
import os
from datetime import datetime, timezone
from bmsdna.lakeapi.core.config import SearchConfig
Expand All @@ -28,7 +24,7 @@ def _get_temp_table_name():
class DuckDBResultData(ResultData):
def __init__(
self,
original_sql: Union[pypika.queries.QueryBuilder, str],
original_sql: Union[QueryBuilder, str],
con: duckdb.DuckDBPyConnection,
chunk_size: int,
) -> None:
Expand All @@ -41,7 +37,7 @@ def __init__(
def columns(self):
return self.arrow_schema().names

def query_builder(self) -> pypika.queries.QueryBuilder:
def query_builder(self) -> QueryBuilder:
return pypika.Query.from_(self.original_sql)

def arrow_schema(self) -> pa.Schema:
Expand Down Expand Up @@ -132,7 +128,7 @@ def get_sql(self, **kwargs):

class DuckDbExecutionContextBase(ExecutionContext):
def __init__(self, con: duckdb.DuckDBPyConnection, chunk_size: int):
super().__init__(chunk_size=chunk_size)
super().__init__(chunk_size=chunk_size, backend=Backend())
self.con = con
self.res_con = None
self.persistance_file_name = None
Expand All @@ -147,7 +143,7 @@ def close(self):
def execute_sql(
self,
sql: Union[
pypika.queries.QueryBuilder,
QueryBuilder,
str,
],
) -> DuckDBResultData:
Expand Down
10 changes: 6 additions & 4 deletions bmsdna/lakeapi/context/df_odbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datetime import datetime, timezone
from bmsdna.lakeapi.core.config import SearchConfig
from uuid import uuid4
from ibis.backends.mssql import Backend


ENABLE_COPY_TO = os.environ.get("ENABLE_COPY_TO", "0") == "1"
Expand Down Expand Up @@ -46,20 +47,21 @@ def __exit__(self, *args, **kwargs):
class ODBCResultData(ResultData):
def __init__(
self,
original_sql: Union[pypika.queries.QueryBuilder, str],
original_sql: Union[QueryBuilder, str],
connection_string: str,
chunk_size: int,
) -> None:
super().__init__(chunk_size=chunk_size)
self.original_sql = original_sql
# todo: expand environment variables in connection string
self.connection_string = connection_string
self._arrow_schema = None
self._df = None

def columns(self):
return self.arrow_schema().names

def query_builder(self) -> pypika.queries.QueryBuilder:
def query_builder(self) -> QueryBuilder:
return pypika.Query.from_(self.original_sql)

def arrow_schema(self) -> pa.Schema:
Expand Down Expand Up @@ -98,7 +100,7 @@ def to_arrow_recordbatch(self, chunk_size: int = 10000):

class ODBCExecutionContext(ExecutionContext):
def __init__(self, chunk_size: int):
super().__init__(chunk_size=chunk_size)
super().__init__(chunk_size=chunk_size, backend=)
self.res_con = None
self.datasources = dict()
self.persistance_file_name = None
Expand All @@ -112,7 +114,7 @@ def close(self):
def execute_sql(
self,
sql: Union[
pypika.queries.QueryBuilder,
QueryBuilder,
str,
],
) -> ODBCResultData:
Expand Down
4 changes: 2 additions & 2 deletions bmsdna/lakeapi/context/df_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
def columns(self):
return self.df.columns

def query_builder(self) -> pypika.queries.QueryBuilder:
def query_builder(self) -> QueryBuilder:
import polars as pl

if not self.registred_df:
Expand Down Expand Up @@ -165,7 +165,7 @@ def register_datasource(
def execute_sql(
self,
sql: Union[
pypika.queries.QueryBuilder,
QueryBuilder,
str,
],
) -> PolarsResultData:
Expand Down
4 changes: 4 additions & 0 deletions bmsdna/lakeapi/query/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import ibis
import ibis.expr

QueryBuilder = ibis.expr.types.Table
9 changes: 9 additions & 0 deletions config_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,15 @@ tables:
file_type: delta
config_from_delta: true

- name: odbc_test
tag: tester
api_method: post
engine: odbc
datasource:
uri: DAta Source=crhdacedwh003.crhsd.local,1433;owjeoijfewj ; DRIVER={}
file_type: odbc
table_name: dbo.storrh

# --------------------------------
# User / Password
# --------------------------------
Expand Down
Loading

0 comments on commit f2c72fd

Please sign in to comment.