diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..51b50a0 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 100 \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/notifica--o-de-problema.md b/.github/ISSUE_TEMPLATE/notifica--o-de-problema.md new file mode 100644 index 0000000..acc81b6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/notifica--o-de-problema.md @@ -0,0 +1,31 @@ +--- +name: Notificação de problema +about: Notifique um problema que está ocorrendo para que possamos corrigí-lo. +title: "[BUG]" +labels: bug +assignees: +--- + +**Descreva o problema** +Uma descrição clara e concisa do problema. + +**Passos realizados** +O passo-a-passo realizado, em detalhes, para chegar a esse problema. Deve ser detalhado a ponto de qualquer pessoa conseguir reproduzir: + +1. Vá até '...' +2. Clique em '....' +3. Role a tela até '....' +4. Seu erro estará em '...' + +**Comportamento esperado** +Uma descrição clara e concisa do que você esperava que acontecesse ao invés do problema. + +**Capturas de tela** +Se cabível, inserir capturas de tela relacionadas ao problema. + +**Contexto:** + +- Sistema operacional: [ex. Ubuntu 20.04] +- Browser (se aplicável) [ex. chrome, safari] +- ... +- Qualquer outra informação de contexto relevante (ex. versão do Python...) diff --git a/.github/ISSUE_TEMPLATE/requerimento-de-funcionalidade.md b/.github/ISSUE_TEMPLATE/requerimento-de-funcionalidade.md new file mode 100644 index 0000000..347d97f --- /dev/null +++ b/.github/ISSUE_TEMPLATE/requerimento-de-funcionalidade.md @@ -0,0 +1,19 @@ +--- +name: Requerimento de funcionalidade +about: Sugira novas funcionalidades que podem ser úteis a todos os desenvolvedores. +title: "[FEATURE]" +labels: enhancement +assignees: +--- + +**Essa funcionalidade está relacionada a um problema?** +Em caso afirmativo, descreva o problema. + +**Descreva a solução ideal para você** +Uma descrição de como seria o funcionamento após a implementação dessa funcionalidade sugerida. + +**Descreva alternativas que você considerou** +Antes de chegar à conclusão de que essa funcionalidade deveria ser agregada, você possivelmente cogitou outras alternativas. Descreva-as. + +**Contexto adicional** +Qualquer outra informação relevante. diff --git a/.github/workflows/build_docker.yaml b/.github/workflows/build_docker.yaml new file mode 100644 index 0000000..e0c85f9 --- /dev/null +++ b/.github/workflows/build_docker.yaml @@ -0,0 +1,48 @@ +name: Build Docker image + +on: + push: + branches: + - main + paths: + - ".github/workflows/cd.yaml" + - "pipelines/**/*" + - "pyproject.toml" + - "Dockerfile" + pull_request: + branches: + - main + paths: + - ".github/workflows/cd_staging.yaml" + - "pipelines/**/*" + - "pyproject.toml" + - "Dockerfile" + +jobs: + build-container: + name: Build Docker image + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push image + uses: docker/build-push-action@v2 + with: + context: . + file: ./Dockerfile + push: true + tags: | + ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }}:${{ github.sha }} + labels: | + org.opencontainers.image.source=${{ github.event.repository.html_url }} + org.opencontainers.image.revision=${{ github.sha }} + build-args: | + BUILDKIT_INLINE_CACHE=1 \ No newline at end of file diff --git a/.github/workflows/cd.yaml b/.github/workflows/cd.yaml new file mode 100644 index 0000000..e13dc8f --- /dev/null +++ b/.github/workflows/cd.yaml @@ -0,0 +1,74 @@ +name: Register flows (production) + +on: + push: + branches: + - main + paths: + - ".github/workflows/cd.yaml" + - "pipelines/**/*" + - "pyproject.toml" + - "Dockerfile" + +env: + PREFECT__BACKEND: cloud + PREFECT__CLOUD__API: ${{ secrets.PREFECT__CLOUD__API }} + PREFECT__CLOUD__PORT: ${{ secrets.PREFECT__CLOUD__PORT }} + PREFECT__SERVER__PROJECT: ${{ secrets.PREFECT__SERVER__PROJECT__PROD }} + PREFECT_AUTH_TOML: ${{ secrets.PREFECT_AUTH_TOML }} + +jobs: + build-container: + name: Register flows (production) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Setup Python version + uses: actions/setup-python@v2 + with: + python-version: "3.10" + + - name: Setup Google Cloud credentials + uses: google-github-actions/setup-gcloud@v0.2.1 + with: + service_account_key: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + export_default_credentials: true + + - name: Install Python dependencies for deploying + run: |- + pip install -U pip poetry + poetry config virtualenvs.create false + poetry install --with dev --with ci + + - name: Update image tag in constants + run: |- + python .github/workflows/scripts/replace_docker_tag.py ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }} ${{ github.sha }} + + - name: Get changed files for code tree analysis + id: files + uses: Ana06/get-changed-files@v2.1.0 + + - name: Perform code tree analysis + id: code-tree-analysis + continue-on-error: true + run: | + python .github/workflows/scripts/code_tree_analysis.py "${{ steps.files.outputs.all }}" --write-to-file + + - name: Write auth.toml + run: |- + mkdir -p $HOME/.prefect + echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml + + - name: Wait for Docker image to be available + uses: lewagon/wait-on-check-action@v1.3.1 + with: + ref: ${{ github.ref }} + check-name: 'Build Docker image' + repo-token: ${{ secrets.GITHUB_TOKEN }} + verbose: true + + - name: Register Prefect flows + run: |- + python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --schedule --filter-affected-flows diff --git a/.github/workflows/cd_staging.yaml b/.github/workflows/cd_staging.yaml new file mode 100644 index 0000000..abb9f95 --- /dev/null +++ b/.github/workflows/cd_staging.yaml @@ -0,0 +1,76 @@ +name: Register flows (staging) + +on: + pull_request: + branches: + - main + paths: + - ".github/workflows/cd_staging.yaml" + - "pipelines/**/*" + - "pyproject.toml" + - "Dockerfile" + +env: + PREFECT__BACKEND: cloud + PREFECT__CLOUD__API: ${{ secrets.PREFECT__CLOUD__API }} + PREFECT__CLOUD__PORT: ${{ secrets.PREFECT__CLOUD__PORT }} + PREFECT__SERVER__PROJECT: ${{ secrets.PREFECT__SERVER__PROJECT__STAGING }} + PREFECT_AUTH_TOML: ${{ secrets.PREFECT_AUTH_TOML }} + +jobs: + build-container: + if: startsWith(github.head_ref, 'staging/') || contains( github.event.pull_request.labels.*.name, 'staging') + name: Register flows (staging) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Setup Python version + uses: actions/setup-python@v2 + with: + python-version: "3.10" + + - name: Setup Google Cloud credentials + uses: google-github-actions/setup-gcloud@v0.2.1 + with: + service_account_key: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + export_default_credentials: true + + - name: Install Python dependencies for deploying + run: |- + pip install -U pip poetry + poetry config virtualenvs.create false + poetry install --with dev --with ci + + - name: Update image tag in constants + run: |- + python .github/workflows/scripts/replace_docker_tag.py ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }} ${{ github.sha }} + + - name: Get changed files for code tree analysis + id: files + uses: Ana06/get-changed-files@v2.1.0 + + - name: Perform code tree analysis + id: code-tree-analysis + continue-on-error: true + run: | + python .github/workflows/scripts/code_tree_analysis.py "${{ steps.files.outputs.all }}" --write-to-file + + - name: Write auth.toml + run: |- + mkdir -p $HOME/.prefect + echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml + + - name: Wait for Docker image to be available + uses: lewagon/wait-on-check-action@v1.3.1 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + check-name: 'Build Docker image' + repo-token: ${{ secrets.GITHUB_TOKEN }} + verbose: true + + - name: Register Prefect flows + run: |- + python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --no-schedule --filter-affected-flows diff --git a/.github/workflows/code-tree-analysis.yaml b/.github/workflows/code-tree-analysis.yaml new file mode 100644 index 0000000..86e119e --- /dev/null +++ b/.github/workflows/code-tree-analysis.yaml @@ -0,0 +1,49 @@ +name: CI + +on: + pull_request: + +jobs: + code-tree-analysis: + name: Code tree analysis + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Setup Python version + uses: actions/setup-python@v2 + with: + python-version: "3.10" + + - name: Install Python dependencies for deploying + run: |- + pip install -U pip poetry + poetry config virtualenvs.create false + poetry install --with dev --with ci + + - name: Check if `prefect build` works + run: | + prefect build + + - name: Get changed files for code tree analysis + id: files + uses: Ana06/get-changed-files@v2.1.0 + + - name: Perform code tree analysis + id: code-tree-analysis + continue-on-error: true + run: | + python .github/workflows/scripts/code_tree_analysis.py "${{ steps.files.outputs.all }}" + + - name: Delete previous comments + uses: izhangzhihao/delete-comment@master + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + delete_user_name: github-actions[bot] + issue_number: ${{ github.event.number }} # remove comments from the current PR + + - name: Comment PR + uses: thollander/actions-comment-pull-request@v1 + with: + message: "${{ steps.code-tree-analysis.outputs.pr-message }}" + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..068af94 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,34 @@ +name: CI + +on: + push: + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 + + - name: Lint Dockerfile + uses: hadolint/hadolint-action@54c9adbab1582c2ef04b2016b760714a4bfde3cf + with: + dockerfile: Dockerfile + + - name: Set up Python + uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e + with: + python-version: "3.10" + + - name: Set up Poetry and upgrade pip + run: | + pip install -U pip poetry + + - name: Install dependencies + run: | + poetry config virtualenvs.create false && poetry install --with dev --with ci + + - name: Lint with black, isort and flake8 + run: | + task lint \ No newline at end of file diff --git a/.github/workflows/scripts/code_tree_analysis.py b/.github/workflows/scripts/code_tree_analysis.py new file mode 100644 index 0000000..88884bb --- /dev/null +++ b/.github/workflows/scripts/code_tree_analysis.py @@ -0,0 +1,493 @@ +# -*- coding: utf-8 -*- +import ast +import sys +from pathlib import Path +from typing import List, Tuple, Union + +import networkx as nx +from prefect import Flow + +message_id = 0 + + +def filename_to_python_module(filename: str) -> str: + """ + Returns the Python module name from a filename. + + Example: + + - Filename: + + ```py + path/to/file.py + ``` + + - Output: + + ```py + 'path.to.file' + ``` + + Args: + filename (str): The filename to get the Python module name from. + + Returns: + str: The Python module name. + """ + # Get the file path in Python module format. + file_path = Path(filename).with_suffix("").as_posix().replace("/", ".") + + return file_path + + +def python_module_to_filename(python_module: str) -> str: + """ + Returns the filename from a Python module. + + Example: + + - Python module: + + ```py + 'path.to.file' + ``` + + - Output: + + ```py + 'path/to/file.py' + ``` + + Args: + python_module (str): The Python module to get the filename from. + + Returns: + str: The filename from the Python module. + """ + # Get the file path in Python module format. + file_path = Path(python_module).with_suffix("").as_posix().replace(".", "/") + + return f"{file_path}.py" + + +def get_dependencies(python_file: Union[str, Path]) -> List[str]: + """ + Returns a list of dependencies from a Python file. The dependencies are + defined as the import statements in the file. Their names on the output + must be fully qualified. + + Example: + + - Python file: + + ```py + from prefect import task + from prefect.tasks.secrets import Secret + from some_package import ( + func1, func2, + ) + ``` + + - Output: + + ```py + ['prefect.task', 'prefect.tasks.secrets.Secret', 'some_package.func1', 'some_package.func2'] + ``` + + Args: + python_file (str): The Python file to get the dependencies from. + + Returns: + list: A list of dependencies from the Python file. + """ + # We need to get the contents of the Python file. + with open(python_file, "r") as f: + content = f.read() + + # Parse it into an AST. + tree = ast.parse(content) + + # Then, iterate over the imports. + dependencies = [] + for node in tree.body: + if isinstance(node, ast.Import): + for name in node.names: + full_name = f"{name.name}" + dependencies.append(full_name) + elif isinstance(node, ast.ImportFrom): + for name in node.names: + full_name = f"{node.module}.{name.name}" + dependencies.append(full_name) + + return dependencies + + +def get_declared(python_file: Union[str, Path]) -> List[str]: + """ + Returns a list of declared variables, functions and classes + in a Python file. The output must be fully qualified. + + Example: + + - Python file (path/to/file.py): + + ```py + x = 1 + y = 2 + + def func1(): + pass + + class Class1: + pass + ``` + + - Output: + + ```py + ['path.to.file.x', 'path.to.file.y', 'path.to.file.func1', 'path.to.file.Class1'] + ``` + + Args: + python_file (str): The Python file to get the declared variables from. + + Returns: + list: A list of declared variables from the Python file. + """ + # We need to get the contents of the Python file. + with open(python_file, "r") as f: + content = f.read() + + # Get file path in Python module format. + file_path = filename_to_python_module(python_file) + + # Parse it into an AST. + tree = ast.parse(content) + + # Then, iterate over the imports. + declared = [] + for node in tree.body: + # print(type(node)) + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name): + declared.append(f"{file_path}.{target.id}") + elif isinstance(node, ast.AugAssign): + if isinstance(node.target, ast.Name): + declared.append(f"{file_path}.{node.target.id}") + elif isinstance(node, ast.AnnAssign): + if isinstance(node.target, ast.Name): + declared.append(f"{file_path}.{node.target.id}") + elif isinstance(node, ast.With): + for item in node.items: + if isinstance(item, ast.withitem): + if isinstance(item.optional_vars, ast.Name): + declared.append(f"{file_path}.{item.optional_vars.id}") + elif isinstance(node, ast.FunctionDef): + declared.append(f"{file_path}.{node.name}") + elif isinstance(node, ast.AsyncFunctionDef): + declared.append(f"{file_path}.{node.name}") + elif isinstance(node, ast.ClassDef): + declared.append(f"{file_path}.{node.name}") + + return declared + + +def list_all_python_files(directory: Union[str, Path]) -> List[Path]: + """ + Returns a list of all Python files in a directory. + + Example: + + - Directory: + + ```py + path/to/file1.py + path/to/file2.py + path/to/file3.py + ``` + + - Output: + + ```py + ['path/to/file1.py', 'path/to/file2.py', 'path/to/file3.py'] + ``` + + Args: + directory (str): The directory to list the files from. + + Returns: + list: A list of all Python files in the directory. + """ + # Get the directory path. + directory = Path(directory) + + # Get all files in the directory. + files = directory.glob("**/*.py") + + # Filter out files that are not Python files. + files = [f for f in files if f.suffix == ".py"] + + return [f.as_posix() for f in files] + + +def object_is_instance(fully_qualified_import: str, compare_to: type) -> bool: + """ + Returns whether an object is an instance of a class. + + Args: + fully_qualified_import (str): The fully qualified import to check. + compare_to (type): The type to compare the import to. + + Returns: + bool: Whether the object is an instance of the class. + """ + # Get the module and class name. + module, class_name = fully_qualified_import.rsplit(".", 1) + + # Import the module. + module = __import__(module, fromlist=[class_name]) + + # Get the object. + object_ = getattr(module, class_name) + + # Check if the object is an instance of the class. + return isinstance(object_, compare_to) + + +def assert_all_imports_are_declared(root_directory: str) -> None: + """ + Asserts that all imports are declared somewhere. + """ + # Get all Python files. + files = [ + file_ for file_ in list_all_python_files(root_directory) if "cookiecutter" not in file_ + ] + + # Get all declared stuff. + declared = set() + for file_ in files: + file_declared = [ + item + for item in get_declared(file_) + if (item.startswith("pipelines") and not item.endswith("*")) + ] + declared.update(file_declared) + + # Get all dependencies. + dependencies = set() + for file_ in files: + file_dependencies = [ + item + for item in get_dependencies(file_) + if (item.startswith("pipelines") and not item.endswith("*")) + ] + dependencies.update(file_dependencies) + + # Assert that all dependencies are declared. + for dependency in dependencies: + assert dependency in declared, f"{dependency} is not declared." + + +def build_dependency_graph(root_directory: str) -> nx.DiGraph: + """ + Builds a dependency graph from a directory. + + Args: + root_directory (str): The directory to build the graph from. + + Returns: + nx.DiGraph: The dependency graph. + """ + # Get all Python files. + files = [ + file_ for file_ in list_all_python_files(root_directory) if "cookiecutter" not in file_ + ] + + # Get dependencies by file. + dependencies_by_file = {} + for file_ in files: + file_dependencies = set( + [item for item in get_dependencies(file_) if item.startswith("pipelines")] + ) + dependencies_by_file[file_] = file_dependencies + + # Get declared stuff by file. + declared_by_file = {} + for file_ in files: + file_declared = set(get_declared(file_)) + declared_by_file[file_] = file_declared + + # Get all declared. + all_declared = set() + for file_ in files: + file_declared = set(get_declared(file_)) + all_declared.update(file_declared) + + # Build the dependency graph. + graph = nx.DiGraph() + + # First we add the dependencies. Each dependency neighbor is a file that + # depends on it. + for file_ in files: + if file_ not in graph.nodes: + graph.add_node(file_) + for dependency in dependencies_by_file[file_]: + if dependency.endswith("*"): + for sub_dependency in all_declared: + if sub_dependency.startswith(dependency[:-1]): + if sub_dependency not in graph.nodes: + graph.add_node(sub_dependency) + graph.add_edge(sub_dependency, file_) + else: + if dependency not in graph.nodes: + graph.add_node(dependency) + graph.add_edge(dependency, file_) + + # Then we add the declared stuff. Each file neighbor is a declared thing. + for file_ in files: + if file_ not in graph.nodes: + graph.add_node(file_) + for dependency in declared_by_file[file_]: + if dependency not in graph.nodes: + graph.add_node(dependency) + graph.add_edge(file_, dependency) + + return graph + + +def check_for_variable_name_conflicts( + changed_files: List[str], root_directory: str +) -> List[Tuple[str, str]]: + """ + Checks if there will be any conflicts with variable names. + """ + # Get all Python files. + files = [ + file_ for file_ in list_all_python_files(root_directory) if "cookiecutter" not in file_ + ] + + # Remove all changed files from the list of files. + files = [file_ for file_ in files if file_ not in changed_files] + + # Get all declared things in the changed files. + declared_changed = set() + for file_ in files: + if file_ in changed_files: + file_declared = set(get_declared(file_)) + declared_changed.update(file_declared) + + # Get all declared things in the remaining files. + declared_remaning = set() + for file_ in files: + file_declared = set(get_declared(file_)) + declared_remaning.update(file_declared) + + # Filter out what is not a Flow. + declared_changed = [obj for obj in declared_changed if object_is_instance(obj, Flow)] + declared_remaning = [obj for obj in declared_remaning if object_is_instance(obj, Flow)] + + # Check for conflicts. + conflicts = [] + for changed in declared_changed: + for remaining in declared_remaning: + if changed.split(".")[-1] == remaining.split(".")[-1]: + conflicts.append((changed, remaining)) + + # Return the conflicts. + return conflicts + + +def log(message: str): + """ + Logs a message to the output of a GitHub Action. + """ + message = message.replace("\n", "%0A") + print(f"::set-output name=pr-message::{message}") + + +if __name__ == "__main__": + # Assert arguments. + if len(sys.argv) not in [2, 3]: + print(f"Usage: python {sys.argv[0]} [--write-to-file]") + + # Write to file? + write_to_file = "--write-to-file" in sys.argv + + # Get modified files + changed_files: List[str] = sys.argv[1].split(" ") + print("These are all the changed files:") + for file_ in changed_files: + print(f"\t- {file_}") + + # Filter out non-Python and non-pipelines files. + changed_files = [ + file_ + for file_ in changed_files + if file_.endswith(".py") + and file_.startswith("pipelines") + and "cookiecutter" not in file_ + and Path(file_).exists() + ] + print("We're interested in these files:") + for file_ in changed_files: + print(f"\t- {file_}") + + # Build the dependency graph. + graph = build_dependency_graph("pipelines/") + + # Get all declarations that the exported files export. + exported_declarations = set() + for file_ in changed_files: + exported_declarations.update(get_declared(file_)) + print("These files export these declarations:") + for declaration in exported_declarations: + print(f"\t- {declaration}") + + # Get all files that depend on the exported declarations. + dependent_files = set() + for declaration in exported_declarations: + dependent_files.update(graph.successors(declaration)) + if "pipelines/flows.py" in dependent_files: + dependent_files.remove("pipelines/flows.py") + for file_ in changed_files: + dependent_files.add(file_) + print("These files depend on the exported declarations:") + for file_ in dependent_files: + print(f"\t- {file_}") + + # Write dependent file list to file. + if write_to_file: + with open("dependent_files.txt", "w") as f: + for file_ in dependent_files: + f.write(f"{file_}\n") + + # Start a PR message + message = "### Análise da árvore de código\n\n" + + # Format a message for the files that depend on the exported declarations. + if len(dependent_files) > 0: + message += "**Os seguintes arquivos são afetados diretamente por alterações " + message += "realizadas nesse pull request:**" + for file_ in dependent_files: + message += f"\n\t- `{file_}`" + message += "\n\n" + + # Check for variable name conflicts. + conflicts = check_for_variable_name_conflicts(changed_files, "pipelines/") + if len(conflicts) > 0: + message += "**Existem conflitos entre nomes de variáveis nos seguintes objetos:**" + for conflict in conflicts: + message += "\n\t- `{conflict[0]}` e `{conflict[1]}`" + message += "\n\n" + + # If there is nothing wrong, let'em know! + if len(dependent_files) == 0 and len(conflicts) == 0: + message += "*Nenhum problema encontrado!*" + + if not write_to_file: + log(message) + + # Raise if there are conflicts + if len(conflicts) > 0: + raise Exception("There are variable name conflicts!") diff --git a/.github/workflows/scripts/register_flows.py b/.github/workflows/scripts/register_flows.py new file mode 100644 index 0000000..121a6ed --- /dev/null +++ b/.github/workflows/scripts/register_flows.py @@ -0,0 +1,549 @@ +# -*- coding: utf-8 -*- +""" +Custom script for registering flows. +""" + +import ast +import glob +import hashlib +import json +import os +import runpy +import sys +import traceback +from collections import Counter, defaultdict +from pathlib import Path +from time import sleep +from typing import Dict, List, Tuple, Union + +import box +import prefect +from loguru import logger +from prefect.run_configs import UniversalRun +from prefect.storage import Local +from prefect.utilities.graphql import EnumValue, compress, with_args +from typer import Typer + +# DO NOT REMOVE THIS LINE +import pipelines # noqa + +app = Typer() +FlowLike = Union[box.Box, "prefect.Flow"] + + +def build_and_register( # pylint: disable=too-many-branches + client: "prefect.Client", + flows: "List[FlowLike]", + project_id: str, + max_retries: int = 5, + retry_interval: int = 5, + schedule: bool = True, +) -> Counter: + """ + (Adapted from Prefect original code.) + + Build and register all flows. + + Args: + - client (prefect.Client): the prefect client to use + - flows (List[FlowLike]): the flows to register + - project_id (str): the project id in which to register the flows + + Returns: + - Counter: stats about the number of successful, failed, and skipped flows. + """ + # Finish preparing flows to ensure a stable hash later + prepare_flows(flows) + + # Group flows by storage instance. + storage_to_flows = defaultdict(list) + for flow in flows: + storage = flow.storage if isinstance(flow, prefect.Flow) else None + storage_to_flows[storage].append(flow) + flow.name = flow.name + + # Register each flow, building storage as needed. + # Stats on success/fail/skip rates are kept for later display + stats = Counter(registered=0, errored=0, skipped=0) + for storage, _flows in storage_to_flows.items(): + # Build storage if needed + if storage is not None: + logger.info(f" Building `{type(storage).__name__}` storage...") + try: + storage.build() + except Exception: # pylint: disable=broad-except + logger.error(" Error building storage:") + logger.error(traceback.format_exc()) + for flow in _flows: + logger.error(f" Registering {flow.name!r}...") + stats["errored"] += 1 + continue + + for flow in _flows: + logger.info(f" Registering {flow.name!r}...", nl=False) + try: + if isinstance(flow, box.Box): + serialized_flow = flow + else: + serialized_flow = flow.serialize(build=False) + + attempts = 0 + while attempts < max_retries: + attempts += 1 + try: + ( + flow_id, + flow_version, + is_new, + ) = register_serialized_flow( + client=client, + serialized_flow=serialized_flow, + project_id=project_id, + schedule=schedule, + ) + break + except Exception: # pylint: disable=broad-except + logger.error("Error registering flow:") + logger.error(traceback.format_exc()) + if attempts < max_retries: + logger.error(f"Retrying in {retry_interval} seconds...") + sleep(retry_interval) + else: + stats["errored"] += 1 + continue + + except Exception: # pylint: disable=broad-except + logger.error(" Error") + logger.error(traceback.format_exc()) + stats["errored"] += 1 + else: + if is_new: + logger.success(" Done") + logger.success(f" └── ID: {flow_id}") + logger.success(f" └── Version: {flow_version}") + stats["registered"] += 1 + else: + logger.warning(" Skipped (metadata unchanged)", fg="yellow") + stats["skipped"] += 1 + return stats + + +def collect_flows( + paths: List[str], +) -> Dict[str, List[FlowLike]]: + """ + (Adapted from Prefect original code.) + + Load all flows found in `paths` & `modules`. + + Args: + - paths (List[str]): file paths to load flows from. + """ + + out = {} + for p in paths: # pylint: disable=invalid-name + flows = load_flows_from_script(p) + out[p] = flows + + # Drop empty sources + out = {source: flows for source, flows in out.items() if flows} + + return out + + +def expand_paths(paths: List[str]) -> List[str]: + """ + (Adapted from Prefect original code.) + + Given a list of paths, expand any directories to find all contained + python files. + """ + out = [] + globbed_paths = set() + for path in tuple(paths): + found_paths = glob.glob(path, recursive=True) + if not found_paths: + raise Exception(f"Path {path!r} doesn't exist") + globbed_paths.update(found_paths) + for path in globbed_paths: + if os.path.isdir(path): + with os.scandir(path) as directory: + out.extend(e.path for e in directory if e.is_file() and e.path.endswith(".py")) + else: + out.append(path) + return out + + +def get_project_id(client: "prefect.Client", project: str) -> str: + """ + (Adapted from Prefect original code.) + + Get a project id given a project name. + + Args: + - project (str): the project name + + Returns: + - str: the project id + """ + resp = client.graphql( + {"query": {with_args("project", {"where": {"name": {"_eq": project}}}): {"id"}}} + ) + if resp.data.project: + return resp.data.project[0].id + raise Exception(f"Project {project!r} does not exist") + + +def load_flows_from_script(path: str) -> "List[prefect.Flow]": + """ + (Adapted from Prefect original code.) + + Given a file path, load all flows found in the file + """ + # We use abs_path for everything but logging (logging the original + # user-specified path provides a clearer message). + abs_path = os.path.abspath(path) + # Temporarily add the flow's local directory to `sys.path` so that local + # imports work. This ensures that `sys.path` is the same as it would be if + # the flow script was run directly (i.e. `python path/to/flow.py`). + orig_sys_path = sys.path.copy() + sys.path.insert(0, os.path.dirname(abs_path)) + try: + with prefect.context({"loading_flow": True, "local_script_path": abs_path}): + namespace = runpy.run_path(abs_path, run_name="") + except Exception as exc: + logger.error(f"Error loading {path!r}:", fg="red") + logger.error(traceback.format_exc()) + raise Exception from exc + finally: + sys.path[:] = orig_sys_path + + flows = [f for f in namespace.values() if isinstance(f, prefect.Flow)] + if flows: + for f in flows: # pylint: disable=invalid-name + if f.storage is None: + f.storage = Local(path=abs_path, stored_as_script=True) + return flows + + +def prepare_flows(flows: "List[FlowLike]") -> None: + """ + (Adapted from Prefect original code.) + + Finish preparing flows. + + Shared code between `register` and `build` for any flow modifications + required before building the flow's storage. Modifies the flows in-place. + """ + labels = () + + # Finish setting up all flows before building, to ensure a stable hash + # for flows sharing storage instances + for flow in flows: + if isinstance(flow, dict): + # Add any extra labels to the flow + if flow.get("environment"): + new_labels = set(flow["environment"].get("labels") or []).union(labels) + flow["environment"]["labels"] = sorted(new_labels) + else: + new_labels = set(flow["run_config"].get("labels") or []).union(labels) + flow["run_config"]["labels"] = sorted(new_labels) + else: + # Set the default flow result if not specified + if not flow.result: + flow.result = flow.storage.result + + # Add a `run_config` if not configured explicitly + if flow.run_config is None and flow.environment is None: + flow.run_config = UniversalRun() + # Add any extra labels to the flow (either specified via the CLI, + # or from the storage object). + obj = flow.run_config or flow.environment + obj.labels.update(labels) + obj.labels.update(flow.storage.labels) + + # Add the flow to storage + flow.storage.add_flow(flow) + + +def register_serialized_flow( + client: "prefect.Client", + serialized_flow: dict, + project_id: str, + force: bool = False, + schedule: bool = True, +) -> Tuple[str, int, bool]: + """ + (Adapted from Prefect original code.) + + Register a pre-serialized flow. + + Args: + - client (prefect.Client): the prefect client + - serialized_flow (dict): the serialized flow + - project_id (str): the project id + - force (bool, optional): If `False` (default), an idempotency key will + be generated to avoid unnecessary re-registration. Set to `True` to + force re-registration. + - schedule (bool, optional): If `True` (default) activates the flow schedule + upon registering. + + Returns: + - flow_id (str): the flow id + - flow_version (int): the flow version + - is_new (bool): True if this is a new flow version, false if + re-registration was skipped. + """ + # Get most recent flow id for this flow. This can be removed once + # the registration graphql routes return more information + flow_name = serialized_flow["name"] + resp = client.graphql( + { + "query": { + with_args( + "flow", + { + "where": { + "_and": { + "name": {"_eq": flow_name}, + "project": {"id": {"_eq": project_id}}, + } + }, + "order_by": {"version": EnumValue("desc")}, + "limit": 1, + }, + ): {"id", "version"} + } + } + ) + if resp.data.flow: + prev_id = resp.data.flow[0].id + prev_version = resp.data.flow[0].version + else: + prev_id = None + prev_version = 0 + + inputs = dict( + project_id=project_id, + serialized_flow=compress(serialized_flow), + set_schedule_active=schedule, + ) + if not force: + inputs["idempotency_key"] = hashlib.sha256( + json.dumps(serialized_flow, sort_keys=True).encode() + ).hexdigest() + + res = client.graphql( + { + "mutation($input: create_flow_from_compressed_string_input!)": { + "create_flow_from_compressed_string(input: $input)": {"id"} + } + }, + variables=dict(input=inputs), + retry_on_api_error=False, + ) + + new_id = res.data.create_flow_from_compressed_string.id + + if new_id == prev_id: + return new_id, prev_version, False + return new_id, prev_version + 1, True + + +def filename_to_python_module(filename: str) -> str: + """ + Returns the Python module name from a filename. + + Example: + + - Filename: + + ```py + path/to/file.py + ``` + + - Output: + + ```py + 'path.to.file' + ``` + + Args: + filename (str): The filename to get the Python module name from. + + Returns: + str: The Python module name. + """ + # Get the file path in Python module format. + file_path = Path(filename).with_suffix("").as_posix().replace("/", ".") + + return file_path + + +def get_declared(python_file: Union[str, Path]) -> List[str]: + """ + Returns a list of declared variables, functions and classes + in a Python file. The output must be fully qualified. + + Example: + + - Python file (path/to/file.py): + + ```py + x = 1 + y = 2 + + def func1(): + pass + + class Class1: + pass + ``` + + - Output: + + ```py + ['path.to.file.x', 'path.to.file.y', 'path.to.file.func1', 'path.to.file.Class1'] + ``` + + Args: + python_file (str): The Python file to get the declared variables from. + + Returns: + list: A list of declared variables from the Python file. + """ + # We need to get the contents of the Python file. + with open(python_file, "r") as f: + content = f.read() + + # Get file path in Python module format. + file_path = filename_to_python_module(python_file) + + # Parse it into an AST. + tree = ast.parse(content) + + # Then, iterate over the imports. + declared = [] + for node in tree.body: + # print(type(node)) + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name): + declared.append(f"{file_path}.{target.id}") + elif isinstance(node, ast.AugAssign): + if isinstance(node.target, ast.Name): + declared.append(f"{file_path}.{node.target.id}") + elif isinstance(node, ast.AnnAssign): + if isinstance(node.target, ast.Name): + declared.append(f"{file_path}.{node.target.id}") + elif isinstance(node, ast.With): + for item in node.items: + if isinstance(item, ast.withitem): + if isinstance(item.optional_vars, ast.Name): + declared.append(f"{file_path}.{item.optional_vars.id}") + elif isinstance(node, ast.FunctionDef): + declared.append(f"{file_path}.{node.name}") + elif isinstance(node, ast.AsyncFunctionDef): + declared.append(f"{file_path}.{node.name}") + elif isinstance(node, ast.ClassDef): + declared.append(f"{file_path}.{node.name}") + + return declared + + +def get_affected_flows(fpath: str = None): + if not fpath: + fpath = "dependent_files.txt" + with open(fpath, "r") as f: + fnames = f.read().splitlines() + fnames = [fname for fname in fnames if fname.endswith(".py")] + flow_files = set() + for fname in fnames: + flow_file = Path(fname).parent / "flows.py" + if flow_file.exists(): + flow_files.add(flow_file) + declared_flows = [] + for flow_file in flow_files: + declared_flows.extend(get_declared(flow_file)) + flows = [] + for flow in declared_flows: + try: + flows.append(eval(flow)) + except Exception as exc: + logger.exception(f"Could not evaluate {flow}: {exc}") + return flows + + +@app.command(name="register", help="Register a flow") +def main( + project: str = None, + path: str = None, + max_retries: int = 5, + retry_interval: int = 5, + schedule: bool = True, + filter_affected_flows: bool = False, +) -> None: + """ + A helper for registering Prefect flows. The original implementation does not + attend to our needs, unfortunately, because of no retry policy. + + Args: + - project (str): The project to register the flows to. + - path (str): The paths to the flows to register. + - max_retries (int, optional): The maximum number of retries to attempt. + - retry_interval (int, optional): The number of seconds to wait between + """ + + if not (project and path): + raise ValueError("Must specify a project and path") + + # Expands paths to find all python files + paths = expand_paths([path]) + + # Gets the project ID + client = prefect.Client() + project_id = get_project_id(client, project) + + # Collects flows from paths + logger.info("Collecting flows...") + source_to_flows = collect_flows(paths) + + if filter_affected_flows: + # Filter out flows that are not affected by the change + affected_flows = get_affected_flows("dependent_files.txt") + for key in source_to_flows.keys(): + filtered_flows = [] + for flow in source_to_flows[key]: + if flow in affected_flows: + filtered_flows.append(flow) + source_to_flows[key] = filtered_flows + + # Iterate through each file, building all storage and registering all flows + # Log errors as they happen, but only exit once all files have been processed + stats = Counter(registered=0, errored=0, skipped=0) + for source, flows in source_to_flows.items(): + logger.info(f"Processing {source!r}:") + stats += build_and_register( + client, + flows, + project_id, + max_retries=max_retries, + retry_interval=retry_interval, + schedule=schedule, + ) + + # Output summary message + registered = stats["registered"] + skipped = stats["skipped"] + errored = stats["errored"] + logger.info( + f"Registered {registered} flows, skipped {skipped} flows, " f"and errored {errored} flows." + ) + + # If not in a watch call, exit with appropriate exit code + if stats["errored"]: + raise Exception("One or more flows failed to register") + + +if __name__ == "__main__": + app() diff --git a/.github/workflows/scripts/replace_docker_tag.py b/.github/workflows/scripts/replace_docker_tag.py new file mode 100644 index 0000000..a340a55 --- /dev/null +++ b/.github/workflows/scripts/replace_docker_tag.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" +Opens the `constants.py` file and updates the +`DOCKER_TAG` variable with the provided argument. +""" + +from pathlib import Path +from sys import argv, exit +from typing import List + +FILE_PATH = Path("./pipelines/constants.py") +REPLACE_TAG = "AUTO_REPLACE_DOCKER_TAG" +REPLACE_IMAGE = "AUTO_REPLACE_DOCKER_IMAGE" + + +def get_name_version_from_args() -> List[str]: + """ + Returns the version from the command line arguments. + """ + if len(argv) != 3: + print("Usage: replace_docker_tag.py ") + exit(1) + return argv[1], argv[2] + + +def replace_in_text(orig_text: str, find_text: str, replace_text: str) -> str: + """ + Replaces the `find_text` with `replace_text` in the `orig_text`. + """ + return orig_text.replace(find_text, replace_text) + + +def update_file(file_path: Path, image_name: str, version: str) -> None: + """ + Updates the `DOCKER_TAG` variable in the `constants.py` file. + """ + with file_path.open("r") as file: + text = file.read() + text = replace_in_text(text, REPLACE_TAG, version) + text = replace_in_text(text, REPLACE_IMAGE, image_name) + with file_path.open("w") as file: + file.write(text) + + +if __name__ == "__main__": + image_name, version = get_name_version_from_args() + update_file(FILE_PATH, image_name, version) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..49f9a4a --- /dev/null +++ b/.gitignore @@ -0,0 +1,149 @@ +# user defined +.replit +replit.nix +**/data/ +test_local.py +pylint.txt +test.py +test/* +test/*.ipynb +test/*.csv +setup.py +.vscode/* +*.hdf + + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints +**/tweets_flamengo/*.ipynb + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env* +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# VSCode project settings +.vscode/ \ No newline at end of file diff --git a/.mergify.yml b/.mergify.yml new file mode 100644 index 0000000..b526799 --- /dev/null +++ b/.mergify.yml @@ -0,0 +1,20 @@ +pull_request_rules: + - name: Automatic update for PRs + conditions: + - -conflict # skip PRs with conflicts + - -draft # filter-out GH draft PRs + - -closed + - -merged + - "#commits-behind>0" + - label!=do-not-update + actions: + update: + - name: Warn author on conflicts + conditions: + - conflict + actions: + comment: + message: "@{{author}} esse pull request tem conflitos 😩" + label: + add: + - conflict diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a2f4ca5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,26 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-added-large-files # prevents adding large files + - id: detect-private-key # detects private keys + - id: fix-byte-order-marker # fixes BOM + - id: fix-encoding-pragma # fixes encoding pragma + - id: no-commit-to-branch # prevents committing to protected branches + - id: trailing-whitespace # prevents trailing whitespace + +- repo: https://github.com/psf/black + rev: 22.12.0 + hooks: + - id: black + language_version: python3.10 + +- repo: https://github.com/PyCQA/isort + rev: 5.12.0 + hooks: + - id: isort + +- repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..71bd704 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +# Build arguments +ARG PYTHON_VERSION=3.10-slim + +# Start Python image +FROM python:${PYTHON_VERSION} + +# Install git +RUN apt-get update && \ + apt-get install -y git && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Setting environment with prefect version +ARG PREFECT_VERSION=1.4.1 +ENV PREFECT_VERSION $PREFECT_VERSION + +# Setup virtual environment and prefect +ENV VIRTUAL_ENV=/opt/venv +RUN python3 -m venv $VIRTUAL_ENV +ENV PATH="$VIRTUAL_ENV/bin:$PATH" +RUN python3 -m pip install --no-cache-dir -U "pip>=21.2.4" "prefect==$PREFECT_VERSION" + +# Install requirements +WORKDIR /app +COPY . . +RUN python3 -m pip install --prefer-binary --no-cache-dir -U . diff --git a/README.md b/README.md new file mode 100644 index 0000000..15657b0 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Pipelines rj-cetrio \ No newline at end of file diff --git a/pipelines/__init__.py b/pipelines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipelines/constants.py b/pipelines/constants.py new file mode 100644 index 0000000..a0f092d --- /dev/null +++ b/pipelines/constants.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from enum import Enum + + +class constants(Enum): + ###################################### + # Automatically managed, + # please do not change these values + ###################################### + # Docker image + DOCKER_TAG = "AUTO_REPLACE_DOCKER_TAG" + DOCKER_IMAGE_NAME = "AUTO_REPLACE_DOCKER_IMAGE" + DOCKER_IMAGE = f"{DOCKER_IMAGE_NAME}:{DOCKER_TAG}" + GCS_FLOWS_BUCKET = "rj-orgao-flows" + + ###################################### + # Agent labels + ###################################### + # EXAMPLE_AGENT_LABEL = "example_agent" + + ###################################### + # Other constants + ###################################### + # EXAMPLE_CONSTANT = "example_constant" diff --git a/pipelines/exemplo/__init__.py b/pipelines/exemplo/__init__.py new file mode 100644 index 0000000..1337dbc --- /dev/null +++ b/pipelines/exemplo/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +from pipelines.exemplo.nome_do_objetivo.flows import * # noqa diff --git a/pipelines/exemplo/nome_do_objetivo/__init__.py b/pipelines/exemplo/nome_do_objetivo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipelines/exemplo/nome_do_objetivo/flows.py b/pipelines/exemplo/nome_do_objetivo/flows.py new file mode 100644 index 0000000..69462c6 --- /dev/null +++ b/pipelines/exemplo/nome_do_objetivo/flows.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +from prefect import Flow, Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +from pipelines.constants import constants +from pipelines.exemplo.nome_do_objetivo.tasks import greet + +with Flow( + name="rj-cetrio: Nome do objetivo - Descrição detalhada do objetivo", +) as exemplo__nome_do_objetivo__greet_flow: + # Parameters + name = Parameter("name", default="rj_cetrio") + + # Tasks + greet_task = greet(name) + +# Storage and run configs +exemplo__nome_do_objetivo__greet_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +exemplo__nome_do_objetivo__greet_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/exemplo/nome_do_objetivo/tasks.py b/pipelines/exemplo/nome_do_objetivo/tasks.py new file mode 100644 index 0000000..4669434 --- /dev/null +++ b/pipelines/exemplo/nome_do_objetivo/tasks.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +from prefect import task +from prefeitura_rio.pipelines_utils.logging import log + + +@task +def greet(name: str = "world") -> None: + log(f"Hello, {name}!") diff --git a/pipelines/flows.py b/pipelines/flows.py new file mode 100644 index 0000000..942c654 --- /dev/null +++ b/pipelines/flows.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +""" +Imports all flows for every project so we can register all of them. +""" +from pipelines.exemplo import * # noqa diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..12eefc3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,47 @@ +[tool.poetry] +name = "pipelines" +version = "0.1.0" +description = "" +authors = ["Gabriel Gazola Milan "] + +[tool.poetry.dependencies] +python = ">=3.10,<3.11" +dbt-bigquery = "^1.6.1" +google-cloud-storage = "^2.10.0" +prefect = "1.4.1" +prefeitura-rio = "1.0.0rc1" + + +[tool.poetry.group.dev] +optional = true + +[tool.poetry.group.dev.dependencies] +black = "^23.7.0" +flake8 = "^6.1.0" +pre-commit = "^3.3.3" +taskipy = "^1.12.0" +isort = "^5.12.0" + + +[tool.poetry.group.ci] +optional = true + +[tool.poetry.group.ci.dependencies] +networkx = "^3.1" +loguru = "^0.7.0" +typer = "^0.9.0" + +[tool.black] +line-length = 100 +target-version = ["py310"] +include = "\\.pyi?$" + +[tool.isort] +profile = "black" + +[tool.taskipy.tasks] +lint = "black . && isort . && flake8 ." + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/queries/.gitignore b/queries/.gitignore new file mode 100644 index 0000000..ddfb2a2 --- /dev/null +++ b/queries/.gitignore @@ -0,0 +1,4 @@ +.user.yml +target/ +dbt_packages/ +logs/ \ No newline at end of file diff --git a/queries/analyses/.gitkeep b/queries/analyses/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml new file mode 100644 index 0000000..f4c3ea3 --- /dev/null +++ b/queries/dbt_project.yml @@ -0,0 +1,26 @@ +name: 'queries' +version: '1.0.0' +config-version: 2 + +profile: 'queries' + +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + + +models: + +persist_docs: + relation: true + columns: true + queries: + example: + +materialized: view + +schema: example diff --git a/queries/macros/generate_schema_name.sql b/queries/macros/generate_schema_name.sql new file mode 100644 index 0000000..c1dee32 --- /dev/null +++ b/queries/macros/generate_schema_name.sql @@ -0,0 +1,14 @@ +{% macro generate_schema_name(custom_schema_name, node) -%} + + {%- set default_schema = target.schema -%} + {%- if custom_schema_name is none -%} + + {{ default_schema }} + + {%- else -%} + + {{ custom_schema_name | trim }} + + {%- endif -%} + +{%- endmacro %} diff --git a/queries/models/example/my_first_dbt_model.sql b/queries/models/example/my_first_dbt_model.sql new file mode 100644 index 0000000..eba72ce --- /dev/null +++ b/queries/models/example/my_first_dbt_model.sql @@ -0,0 +1,26 @@ + +/* + Welcome to your first dbt model! + Did you know that you can also configure models directly within SQL files? + This will override configurations stated in dbt_project.yml + Try changing "table" to "view" below +*/ + +{{ config(materialized='table') }} + +with source_data as ( + + select 1 as id + union all + select null as id + +) + +select * +from source_data + +/* + Uncomment the line below to remove records with null `id` values +*/ + +-- where id is not null diff --git a/queries/models/example/my_second_dbt_model.sql b/queries/models/example/my_second_dbt_model.sql new file mode 100644 index 0000000..c91f879 --- /dev/null +++ b/queries/models/example/my_second_dbt_model.sql @@ -0,0 +1,6 @@ + +-- Use the `ref` function to select from other models + +select * +from {{ ref('my_first_dbt_model') }} +where id = 1 diff --git a/queries/models/example/schema.yml b/queries/models/example/schema.yml new file mode 100644 index 0000000..2a53081 --- /dev/null +++ b/queries/models/example/schema.yml @@ -0,0 +1,21 @@ + +version: 2 + +models: + - name: my_first_dbt_model + description: "A starter dbt model" + columns: + - name: id + description: "The primary key for this table" + tests: + - unique + - not_null + + - name: my_second_dbt_model + description: "A starter dbt model" + columns: + - name: id + description: "The primary key for this table" + tests: + - unique + - not_null diff --git a/queries/profiles.yml b/queries/profiles.yml new file mode 100644 index 0000000..e550a22 --- /dev/null +++ b/queries/profiles.yml @@ -0,0 +1,14 @@ +queries: + outputs: + dev: + dataset: dbt + job_execution_timeout_seconds: 300 + job_retries: 1 + keyfile: /tmp/credentials.json + location: US + method: service-account + priority: interactive + project: rj-cetrio + threads: 1 + type: bigquery + target: dev diff --git a/queries/seeds/.gitkeep b/queries/seeds/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/queries/snapshots/.gitkeep b/queries/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/queries/tests/.gitkeep b/queries/tests/.gitkeep new file mode 100644 index 0000000..e69de29