Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[442] - Pagination added for logs #832

Merged
merged 11 commits into from
Sep 3, 2024
13 changes: 6 additions & 7 deletions ddpui/api/pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,11 @@
@pipelineapi.get("flow_runs/{flow_run_id}/logs", auth=auth.CustomAuthMiddleware())
@has_permission(["can_view_pipeline"])
def get_flow_runs_logs(
request, flow_run_id, offset: int = 0
request, flow_run_id, task_run_id = '', limit: int = 0, offset: int = 0
): # pylint: disable=unused-argument
"""return the logs from a flow-run"""
try:
result = prefect_service.get_flow_run_logs(flow_run_id, offset)
result = prefect_service.get_flow_run_logs(flow_run_id, task_run_id,limit,offset)

Check warning on line 656 in ddpui/api/pipeline_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/pipeline_api.py#L656

Added line #L656 was not covered by tests
except Exception as error:
logger.exception(error)
raise HttpError(400, "failed to retrieve logs") from error
Expand Down Expand Up @@ -724,18 +724,17 @@
)
@has_permission(["can_view_pipeline"])
def get_prefect_flow_runs_log_history_v1(
request, deployment_id, limit: int = 0, fetchlogs=True, offset: int = 0
request, deployment_id, limit: int = 0, offset: int = 0
):
# pylint: disable=unused-argument
"""Fetch all flow runs for the deployment and the logs for each flow run"""
flow_runs = prefect_service.get_flow_runs_by_deployment_id_v1(
deployment_id=deployment_id, limit=limit, offset=offset
)

if fetchlogs:
for flow_run in flow_runs:
logs_dict = prefect_service.get_flow_run_logs_v2(flow_run["id"])
flow_run["runs"] = logs_dict
for flow_run in flow_runs:
graph_dict = prefect_service.get_flow_run_graphs(flow_run["id"])
flow_run["runs"] = graph_dict

Check warning on line 737 in ddpui/api/pipeline_api.py

View check run for this annotation

Codecov / codecov/patch

ddpui/api/pipeline_api.py#L735-L737

Added lines #L735 - L737 were not covered by tests

return flow_runs
Ishankoradia marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
16 changes: 14 additions & 2 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpprefect.prefect_service import (
get_flow_run_graphs,
get_flow_run_logs,
update_dbt_core_block_schema,
get_dbt_cli_profile_block,
prefect_get,
Expand All @@ -56,6 +58,7 @@
TASK_DBTCLEAN,
TASK_DBTDEPS,
TASK_AIRBYTESYNC,
FLOW_RUN_LOGS_OFFSET_LIMIT
)
from ddpui.ddpprefect import DBTCLIPROFILE
from ddpui.core import llm_service
Expand Down Expand Up @@ -750,8 +753,8 @@
log_file_name = ""
try:
if type == LogsSummarizationType.DEPLOYMENT:
all_task_logs = get_flow_run_logs_v2(flow_run_id)
dbt_tasks = [task for task in all_task_logs if task["id"] == task_id]
all_task = get_flow_run_graphs(flow_run_id)
dbt_tasks = [task for task in all_task if task["id"] == task_id]

Check warning on line 757 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L756-L757

Added lines #L756 - L757 were not covered by tests
if len(dbt_tasks) == 0:
taskprogress.add(
{
Expand All @@ -762,6 +765,15 @@
)
return
task = dbt_tasks[0]
task['logs'] = []
limit = 0
while(True):
new_logs_set = get_flow_run_logs(flow_run_id, task_id, limit, offset=FLOW_RUN_LOGS_OFFSET_LIMIT)
task['logs']+=new_logs_set['logs']
if len(new_logs_set['logs']) == FLOW_RUN_LOGS_OFFSET_LIMIT:
limit+=1

Check warning on line 774 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L768-L774

Added lines #L768 - L774 were not covered by tests
else:
Ishankoradia marked this conversation as resolved.
Show resolved Hide resolved
break

Check warning on line 776 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L776

Added line #L776 was not covered by tests
logs_text = "\n".join([log["message"] for log in task["logs"]])
log_file_name = f"{flow_run_id}_{task_id}"
elif type == LogsSummarizationType.AIRBYTE_SYNC:
Expand Down
13 changes: 11 additions & 2 deletions ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,13 @@
return res


def get_flow_run_logs(flow_run_id: str, offset: int) -> dict: # pragma: no cover
def get_flow_run_logs(
flow_run_id: str, task_run_id: str, limit: int, offset: int
) -> dict: # pragma: no cover
"""retreive the logs from a flow-run from prefect"""
res = prefect_get(
f"flow_runs/logs/{flow_run_id}",
params={"offset": offset},
params={"offset": offset, "limit": limit, "task_run_id": task_run_id},
)
return {"logs": res}

Expand All @@ -608,6 +610,13 @@
)
return res

def get_flow_run_graphs(flow_run_id: str) -> dict:
"""retreive the tasks from a flow-run from prefect"""
res = prefect_get(

Check warning on line 615 in ddpui/ddpprefect/prefect_service.py

View check run for this annotation

Codecov / codecov/patch

ddpui/ddpprefect/prefect_service.py#L615

Added line #L615 was not covered by tests
f"flow_runs/graph/{flow_run_id}",
)
return res

Check warning on line 618 in ddpui/ddpprefect/prefect_service.py

View check run for this annotation

Codecov / codecov/patch

ddpui/ddpprefect/prefect_service.py#L618

Added line #L618 was not covered by tests


def get_flow_run(flow_run_id: str) -> dict:
"""retreive the logs from a flow-run from prefect"""
Expand Down
4 changes: 2 additions & 2 deletions ddpui/tests/services/test_prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,9 @@ def test_get_deployment(mock_get: Mock):
@patch("ddpui.ddpprefect.prefect_service.prefect_get")
def test_get_flow_run_logs(mock_get: Mock):
mock_get.return_value = "the-logs"
response = get_flow_run_logs("flowrunid", 3)
response = get_flow_run_logs("flowrunid","taskrunid", 10, 3)
assert response == {"logs": "the-logs"}
mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3})
mock_get.assert_called_once_with("flow_runs/logs/flowrunid", params={"offset": 3, "limit": 10, "task_run_id": "taskrunid"})


@patch("ddpui.ddpprefect.prefect_service.prefect_get")
Expand Down
3 changes: 3 additions & 0 deletions ddpui/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@
SYSTEM_USER_EMAIL = "System User"

# prefect flow run states

# offset limit for fetching logs
FLOW_RUN_LOGS_OFFSET_LIMIT = 200
Loading