Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added get_dataflow_definition #281

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions src/sempy_labs/_dataflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def list_dataflows(workspace: Optional[str] = None):
"""

(workspace, workspace_id) = resolve_workspace_name_and_id(workspace)

# Power BI Dataflow Gen1
client = fabric.PowerBIRestClient()
response = client.get(f"/v1.0/myorg/groups/{workspace_id}/dataflows")
if response.status_code != 200:
Expand All @@ -43,14 +45,29 @@ def list_dataflows(workspace: Optional[str] = None):
"Dataflow Name": v.get("name"),
"Configured By": v.get("configuredBy"),
"Users": [v.get("users")],
"Generation": v.get("generation"),
"Generation": 1,
}
df = pd.concat(
[df, pd.DataFrame(new_data, index=[0])],
[df, pd.DataFrame([new_data])],
ignore_index=True,
)

df["Generation"] = df["Generation"].astype(int)
# Fabric Dataflow Gen2
client = fabric.FabricRestClient()
response = client.get(f"/v1/workspaces/{workspace_id}/dataflows")
if response.status_code != 200:
raise FabricHTTPException(response)

responses = pagination(client, response)

for r in responses:
for v in r.get("value", []):
new_data = {
"Dataflow Id": v.get("id"),
"Dataflow Name": v.get("displayName"),
"Generation": 2
}
df = pd.concat([df, pd.DataFrame([new_data])], ignore_index=True)

return df

Expand Down Expand Up @@ -245,3 +262,58 @@ def _resolve_dataflow_name_and_id(
dataflow_name = dfD_filt["Dataflow Name"].iloc[0]

return dataflow_name, dataflow_id


def get_dataflow_definition(
name: str, workspace: Optional[str] = None, decode: bool = True,
) -> pd.DataFrame:
"""
Obtains the definition of a dataflow.

This is a wrapper function for the following API: `Dataflows - Get Dataflow <https://learn.microsoft.com/rest/api/power-bi/dataflows/get-dataflow>`_.

Parameters
----------
name : str
The name of the dataflow.
workspace : str, optional
The Fabric workspace name.
Defaults to None, which resolves to the workspace of the attached lakehouse
or if no lakehouse is attached, resolves to the workspace of the notebook.
decode : bool, optional
If True, decodes the dataflow definition file into JSON format.
If False, obtains the dataflow definition file
as a pandas DataFrame. Defaults to True.

Returns
-------
dict or pandas.DataFrame
A pandas DataFrame showing the dataflow within a workspace or a dictionary with the dataflow definition.
"""

workspace = fabric.resolve_workspace_name(workspace)
workspace_id = fabric.resolve_workspace_id(workspace)

dataflow_name, item_id = _resolve_dataflow_name_and_id(dataflow=name, workspace=workspace)

try:
# Fabric Dataflow Gen2
client = fabric.FabricRestClient()
response = client.post(
f"/v1/workspaces/{workspace_id}/items/{item_id}/getDefinition"
)
except:
# Power BI Dataflow Gen1
client = fabric.PowerBIRestClient()
response = client.get(
f"v1.0/myorg/groups/{workspace_id}/dataflows/{item_id}"
)

result = lro(client, response).json()

df = pd.json_normalize(result)

if not decode:
return df

return result