Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[442] - Pagination added for logs #832

Merged
merged 11 commits into from
Sep 3, 2024
16 changes: 14 additions & 2 deletions ddpui/celeryworkers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from ddpui.utils.singletaskprogress import SingleTaskProgress
from ddpui.ddpairbyte import airbyte_service, airbytehelpers
from ddpui.ddpprefect.prefect_service import (
get_flow_run_graphs,
get_flow_run_logs,
update_dbt_core_block_schema,
get_dbt_cli_profile_block,
prefect_get,
Expand All @@ -56,6 +58,7 @@
TASK_DBTCLEAN,
TASK_DBTDEPS,
TASK_AIRBYTESYNC,
FLOW_RUN_LOGS_OFFSET_LIMIT
)
from ddpui.ddpprefect import DBTCLIPROFILE
from ddpui.core import llm_service
Expand Down Expand Up @@ -750,8 +753,8 @@
log_file_name = ""
try:
if type == LogsSummarizationType.DEPLOYMENT:
all_task_logs = get_flow_run_logs_v2(flow_run_id)
dbt_tasks = [task for task in all_task_logs if task["id"] == task_id]
all_task = get_flow_run_graphs(flow_run_id)
dbt_tasks = [task for task in all_task if task["id"] == task_id]

Check warning on line 757 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L756-L757

Added lines #L756 - L757 were not covered by tests
if len(dbt_tasks) == 0:
taskprogress.add(
{
Expand All @@ -762,6 +765,15 @@
)
return
task = dbt_tasks[0]
task['logs'] = []
limit = 0
while(True):
new_logs_set = get_flow_run_logs(flow_run_id, task_id, limit, offset=FLOW_RUN_LOGS_OFFSET_LIMIT)
task['logs']+=new_logs_set['logs']
if len(new_logs_set['logs']) == FLOW_RUN_LOGS_OFFSET_LIMIT:
limit+=1

Check warning on line 774 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L768-L774

Added lines #L768 - L774 were not covered by tests
else:
Ishankoradia marked this conversation as resolved.
Show resolved Hide resolved
break

Check warning on line 776 in ddpui/celeryworkers/tasks.py

View check run for this annotation

Codecov / codecov/patch

ddpui/celeryworkers/tasks.py#L776

Added line #L776 was not covered by tests
logs_text = "\n".join([log["message"] for log in task["logs"]])
log_file_name = f"{flow_run_id}_{task_id}"
elif type == LogsSummarizationType.AIRBYTE_SYNC:
Expand Down
2 changes: 1 addition & 1 deletion ddpui/ddpprefect/prefect_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,12 @@
)
return res

def get_flow_run_graphs(flow_run_id: str) -> dict: # pragma: no cover
def get_flow_run_graphs(flow_run_id: str) -> dict:
"""retreive the tasks from a flow-run from prefect"""
res = prefect_get(

Check warning on line 615 in ddpui/ddpprefect/prefect_service.py

View check run for this annotation

Codecov / codecov/patch

ddpui/ddpprefect/prefect_service.py#L615

Added line #L615 was not covered by tests
f"flow_runs/graph/{flow_run_id}",
)
return res

Check warning on line 618 in ddpui/ddpprefect/prefect_service.py

View check run for this annotation

Codecov / codecov/patch

ddpui/ddpprefect/prefect_service.py#L618

Added line #L618 was not covered by tests


def get_flow_run(flow_run_id: str) -> dict:
Expand Down
3 changes: 3 additions & 0 deletions ddpui/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@
SYSTEM_USER_EMAIL = "System User"

# prefect flow run states

# offset limit for fetching logs
FLOW_RUN_LOGS_OFFSET_LIMIT = 200
Loading