add observer task block (#1665)

This commit is contained in:
Shuchang Zheng
2025-01-28 16:59:54 +08:00
committed by GitHub
parent 1b79ef9ca3
commit 185fc330a4
11 changed files with 224 additions and 22 deletions

View File

@@ -63,6 +63,7 @@ from skyvern.forge.sdk.workflow.exceptions import (
InvalidTemplateWorkflowPermanentId,
WorkflowParameterMissingRequiredValue,
)
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.forge.sdk.workflow.models.workflow import (
RunWorkflowResponse,
Workflow,
@@ -767,24 +768,7 @@ async def get_workflow_run_timeline(
page_size: int = Query(20, ge=1),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRunTimeline]:
# get observer cruise by workflow run id
observer_cruise_obj = await app.DATABASE.get_observer_cruise_by_workflow_run_id(
workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id,
)
# get all the workflow run blocks
workflow_run_block_timeline = await app.WORKFLOW_SERVICE.get_workflow_run_timeline(
workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id,
)
if observer_cruise_obj and observer_cruise_obj.observer_cruise_id:
observer_thought_timeline = await observer_service.get_observer_thought_timelines(
observer_cruise_id=observer_cruise_obj.observer_cruise_id,
organization_id=current_org.organization_id,
)
workflow_run_block_timeline.extend(observer_thought_timeline)
workflow_run_block_timeline.sort(key=lambda x: x.created_at, reverse=True)
return workflow_run_block_timeline
return await _flatten_workflow_run_timeline(current_org.organization_id, workflow_run_id)
@base_router.get(
@@ -1310,3 +1294,52 @@ async def close_browser_session(
status_code=200,
media_type="application/json",
)
async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: str) -> list[WorkflowRunTimeline]:
"""
Get the timeline workflow runs including the nested workflow runs in a flattened list
"""
# get observer task by workflow run id
observer_task_obj = await app.DATABASE.get_observer_cruise_by_workflow_run_id(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# get all the workflow run blocks
workflow_run_block_timeline = await app.WORKFLOW_SERVICE.get_workflow_run_timeline(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# loop through the run block timeline, find the task_v2 blocks, flatten the timeline for task_v2
final_workflow_run_block_timeline = []
for timeline in workflow_run_block_timeline:
if not timeline.block:
continue
if timeline.block.block_type != BlockType.TaskV2:
# flatten the timeline for task_v2
final_workflow_run_block_timeline.append(timeline)
continue
if not timeline.block.block_workflow_run_id:
LOG.error(
"Block workflow run id is not set for task_v2 block",
workflow_run_id=workflow_run_id,
organization_id=organization_id,
observer_cruise_id=observer_task_obj.observer_cruise_id if observer_task_obj else None,
)
continue
# in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline
workflow_blocks = await _flatten_workflow_run_timeline(
organization_id=organization_id,
workflow_run_id=timeline.block.block_workflow_run_id,
)
final_workflow_run_block_timeline.extend(workflow_blocks)
if observer_task_obj and observer_task_obj.observer_cruise_id:
observer_thought_timeline = await observer_service.get_observer_thought_timelines(
observer_cruise_id=observer_task_obj.observer_cruise_id,
organization_id=organization_id,
)
final_workflow_run_block_timeline.extend(observer_thought_timeline)
final_workflow_run_block_timeline.sort(key=lambda x: x.created_at, reverse=True)
return final_workflow_run_block_timeline