Skip to content

Commit

Permalink
[1/n][dagster-fivetran] Scaffold FivetranWorkspace for rework (#25750)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Builds out a very barebones resource and client classes for the new
version of the Fivetran integration.

## How I Tested These Changes

Tests will be added in subsequent PRs.
  • Loading branch information
maximearmstrong authored Nov 8, 2024
1 parent 04d4f98 commit 0a0c77b
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from dagster_fivetran.resources import (
FivetranResource as FivetranResource,
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
)
from dagster_fivetran.types import FivetranOutput as FivetranOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple
from urllib.parse import urljoin

Expand All @@ -16,11 +17,14 @@
get_dagster_logger,
resource,
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

Expand Down Expand Up @@ -436,3 +440,142 @@ def my_fivetran_job():
"""
return FivetranResource.from_resource_context(context)


# ------------------
# Reworked resources
# ------------------
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()


@experimental
class FivetranClient:
"""This class exposes methods on top of the Fivetran REST API."""

def __init__(
self,
api_key: str,
api_secret: str,
request_max_retries: int,
request_retry_delay: float,
):
self.api_key = api_key
self.api_secret = api_secret
self.request_max_retries = request_max_retries
self.request_retry_delay = request_retry_delay

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@property
def api_base_url(self) -> str:
raise NotImplementedError()

@property
def api_connector_url(self) -> str:
raise NotImplementedError()

def make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()


@experimental
class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
to interact with Fivetran APIs.
"""

api_key: str = Field(description="The Fivetran API key to use for this resource.")
api_secret: str = Field(description="The Fivetran API secret to use for this resource.")
request_max_retries: int = Field(
default=3,
description=(
"The maximum number of times requests to the Fivetran API should be retried "
"before failing."
),
)
request_retry_delay: float = Field(
default=0.25,
description="Time (in seconds) to wait between each request retry.",
)

_client: FivetranClient = PrivateAttr(default=None)

def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
api_secret=self.api_secret,
request_max_retries=self.request_max_retries,
request_retry_delay=self.request_retry_delay,
)

def fetch_fivetran_workspace_data(
self,
) -> FivetranWorkspaceData:
"""Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Fivetran API.
Returns:
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
raise NotImplementedError()

0 comments on commit 0a0c77b

Please sign in to comment.