Skip to content

Commit

Permalink
feat(ingest/looker): support emitting unused explores (#9159)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 2, 2023
1 parent ec97250 commit 148ad1a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def _get_field_type(

# if still not found, log and continue
if type_class is None:
logger.info(
logger.debug(
f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.",
)
type_class = NullTypeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ class LookerDashboardSourceConfig(
False,
description="Extract looks which are not part of any Dashboard. To enable this flag the stateful_ingestion should also be enabled.",
)
emit_used_explores_only: bool = Field(
True,
description="When enabled, only explores that are used by a Dashboard/Look will be ingested.",
)

@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class LookerAPIStats(BaseModel):
lookml_model_calls: int = 0
all_dashboards_calls: int = 0
all_looks_calls: int = 0
all_models_calls: int = 0
get_query_calls: int = 0
search_looks_calls: int = 0
search_dashboards_calls: int = 0
Expand Down Expand Up @@ -155,6 +156,12 @@ def dashboard(self, dashboard_id: str, fields: Union[str, List[str]]) -> Dashboa
transport_options=self.transport_options,
)

def all_lookml_models(self) -> Sequence[LookmlModel]:
self.client_stats.all_models_calls += 1
return self.client.all_lookml_models(
transport_options=self.transport_options,
)

def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore:
self.client_stats.explore_calls += 1
return self.client.lookml_model_explore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
)
self.reporter._looker_explore_registry = self.explore_registry
self.reporter._looker_api = self.looker_api

self.reachable_look_registry = set()

self.explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {}
# (model, explore) -> list of charts/looks/dashboards that reference this explore
# The list values are used purely for debugging purposes.
self.reachable_explores: Dict[Tuple[str, str], List[str]] = {}

# Keep stat generators to generate entity stat aspect later
stat_generator_config: looker_usage.StatGeneratorConfig = (
Expand Down Expand Up @@ -378,11 +381,11 @@ def _get_input_fields_from_query(

return result

def add_explore_to_fetch(self, model: str, explore: str, via: str) -> None:
if (model, explore) not in self.explores_to_fetch_set:
self.explores_to_fetch_set[(model, explore)] = []
def add_reachable_explore(self, model: str, explore: str, via: str) -> None:
if (model, explore) not in self.reachable_explores:
self.reachable_explores[(model, explore)] = []

self.explores_to_fetch_set[(model, explore)].append(via)
self.reachable_explores[(model, explore)].append(via)

def _get_looker_dashboard_element( # noqa: C901
self, element: DashboardElement
Expand All @@ -403,7 +406,7 @@ def _get_looker_dashboard_element( # noqa: C901
f"Element {element.title}: Explores added via query: {explores}"
)
for exp in explores:
self.add_explore_to_fetch(
self.add_reachable_explore(
model=element.query.model,
explore=exp,
via=f"look:{element.look_id}:query:{element.dashboard_id}",
Expand Down Expand Up @@ -439,7 +442,7 @@ def _get_looker_dashboard_element( # noqa: C901
explores = [element.look.query.view]
logger.debug(f"Element {title}: Explores added via look: {explores}")
for exp in explores:
self.add_explore_to_fetch(
self.add_reachable_explore(
model=element.look.query.model,
explore=exp,
via=f"Look:{element.look_id}:query:{element.dashboard_id}",
Expand Down Expand Up @@ -483,7 +486,7 @@ def _get_looker_dashboard_element( # noqa: C901
)

for exp in explores:
self.add_explore_to_fetch(
self.add_reachable_explore(
model=element.result_maker.query.model,
explore=exp,
via=f"Look:{element.look_id}:resultmaker:query",
Expand All @@ -495,7 +498,7 @@ def _get_looker_dashboard_element( # noqa: C901
if filterable.view is not None and filterable.model is not None:
model = filterable.model
explores.append(filterable.view)
self.add_explore_to_fetch(
self.add_reachable_explore(
model=filterable.model,
explore=filterable.view,
via=f"Look:{element.look_id}:resultmaker:filterable",
Expand Down Expand Up @@ -694,20 +697,26 @@ def _make_dashboard_metadata_events(
def _make_explore_metadata_events(
self,
) -> Iterable[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]:
if self.source_config.emit_used_explores_only:
explores_to_fetch = list(self.reachable_explores.keys())
else:
explores_to_fetch = list(self.list_all_explores())
explores_to_fetch.sort()

with concurrent.futures.ThreadPoolExecutor(
max_workers=self.source_config.max_threads
) as async_executor:
self.reporter.total_explores = len(self.explores_to_fetch_set)
self.reporter.total_explores = len(explores_to_fetch)

explore_futures = {
async_executor.submit(self.fetch_one_explore, model, explore): (
model,
explore,
)
for (model, explore) in self.explores_to_fetch_set
for (model, explore) in explores_to_fetch
}

for future in concurrent.futures.as_completed(explore_futures):
for future in concurrent.futures.wait(explore_futures).done:
events, explore_id, start_time, end_time = future.result()
del explore_futures[future]
self.reporter.explores_scanned += 1
Expand All @@ -717,6 +726,17 @@ def _make_explore_metadata_events(
f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}"
)

def list_all_explores(self) -> Iterable[Tuple[str, str]]:
# returns a list of (model, explore) tuples

for model in self.looker_api.all_lookml_models():
if model.name is None or model.explores is None:
continue
for explore in model.explores:
if explore.name is None:
continue
yield (model.name, explore.name)

def fetch_one_explore(
self, model: str, explore: str
) -> Tuple[
Expand Down Expand Up @@ -954,7 +974,7 @@ def _input_fields_from_dashboard_element(
)
if explore is not None:
# add this to the list of explores to finally generate metadata for
self.add_explore_to_fetch(
self.add_reachable_explore(
input_field.model, input_field.explore, entity_urn
)
entity_urn = explore.get_explore_urn(self.source_config)
Expand Down

0 comments on commit 148ad1a

Please sign in to comment.