Skip to content

Commit

Permalink
Add documentation to gaarf.bq_executor
Browse files Browse the repository at this point in the history
Change-Id: I63ef77e7547db5d54a86998483d13f2f087fec62
  • Loading branch information
AVMarkin committed Apr 25, 2024
1 parent 3899e60 commit a411c8f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 29 deletions.
86 changes: 64 additions & 22 deletions py/gaarf/bq_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for executing queries in BigQuery."""
from __future__ import annotations

from typing import Any, Dict, List, Optional, Union
from google.cloud import bigquery # type: ignore
from google.cloud.exceptions import NotFound # type: ignore
from jinja2 import Template
import logging
import pandas as pd

from .query_post_processor import PostProcessorMixin

import pandas as pd
from gaarf.query_post_processor import PostProcessorMixin
from google.cloud import bigquery # type: ignore
from google.cloud.exceptions import NotFound # type: ignore

logger = logging.getLogger(__name__)

Expand All @@ -30,44 +29,87 @@ class BigQueryExecutorException(Exception):


class BigQueryExecutor(PostProcessorMixin):
"""Handles query execution in BigQuery.
Attributes:
project_id: Google Cloud project id.
location: BigQuery dataset location.
client: BigQuery client.
"""

def __init__(self, project_id: str, location: str | None = None) -> None:
"""Initializes BigQueryExecutor.
def __init__(self, project_id: str, location: Optional[str] = None):
Args:
project_id: Google Cloud project id.
location: BigQuery dataset location.
"""
self.project_id = project_id
self.location = location
self.client = bigquery.Client(project_id)

def execute(
self,
script_name: str,
query_text: str,
params: Optional[Dict[str, Any]] = None) -> Optional[pd.DataFrame]:
def execute(self,
script_name: str,
query_text: str,
params: dict | None = None) -> pd.DataFrame | None:
"""Executes query in BigQuery.
Args:
script_name: Script identifier.
query_text: Query to be executed.
params: Optional parameters to be replaced in query text.
Returns:
DataFrame if query returns some data, None if it creates data in BQ.
"""
query_text = self.replace_params_template(query_text, params)
job = self.client.query(query_text)
try:
result = job.result()
logger.debug("%s launched successfully", script_name)
logger.debug('%s launched successfully', script_name)
if result.total_rows:
return result.to_dataframe()
return None
except Exception as e:
raise BigQueryExecutorException(e) from e

def create_datasets(self, macros: Optional[Dict[str, Any]]) -> None:
def create_datasets(self, macros: dict | None) -> None:
"""Creates datasets in BQ based on values in a dict.
If dict contains keys with 'dataset' in them, then values for such keys
are treated as dataset names.
Args:
macros: Mapping containing data for query execution.
"""
if macros:
if (datasets := extract_datasets(macros)):
for dataset in datasets:
dataset_id = f"{self.project_id}.{dataset}"
dataset_id = f'{self.project_id}.{dataset}'
try:
bq_dataset = self.client.get_dataset(dataset_id)
self.client.get_dataset(dataset_id)
except NotFound:
bq_dataset = bigquery.Dataset(dataset_id)
bq_dataset.location = self.location
bq_dataset = self.client.create_dataset(bq_dataset,
timeout=30)
logger.debug("Created new dataset %s", dataset_id)
self.client.create_dataset(
bq_dataset, timeout=30)
logger.debug('Created new dataset %s', dataset_id)


def extract_datasets(macros: dict | None) -> list[str] | None:
"""Finds dataset-related keys based on values in a dict.
If dict contains keys with 'dataset' in them, then values for such keys
are treated as dataset names.
Args:
macros: Mapping containing data for query execution.
def extract_datasets(macros: Optional[Dict[str, Any]]) -> Optional[List[str]]:
Returns:
Possible names of datasets.
"""
if not macros:
return None
return [value for macro, value in macros.items() if "dataset" in macro]
return [value for macro, value in macros.items() if 'dataset' in macro]
19 changes: 12 additions & 7 deletions py/tests/unit/test_bq_executor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import pytest
from __future__ import annotations

from gaarf.bq_executor import extract_datasets


def test_extract_datasets():
macros = {
"start_date": ":YYYYMMDD",
"bq_dataset": "dataset_1",
"dataset_new": "dataset_2",
"legacy_dataset_old": "dataset_3",
"wrong_dts": "dataset_4"
'start_date': ':YYYYMMDD',
'bq_dataset': 'dataset_1',
'dataset_new': 'dataset_2',
'legacy_dataset_old': 'dataset_3',
'wrong_dts': 'dataset_4'
}

expected = ["dataset_1", "dataset_2", "dataset_3"]
expected = [
'dataset_1',
'dataset_2',
'dataset_3',
]
datasets = extract_datasets(macros)
assert datasets == expected

0 comments on commit a411c8f

Please sign in to comment.