Add run timeline endpoint (#3312)

Co-authored-by: Suchintan Singh <suchintan@skyvern.com>
This commit is contained in:
Suchintan
2025-08-28 16:29:27 -04:00
committed by GitHub
parent 149856a31d
commit 015194f2a4
2 changed files with 77 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ from skyvern.forge.sdk.routes.code_samples import (
CREATE_WORKFLOW_CODE_SAMPLE_PYTHON,
DELETE_WORKFLOW_CODE_SAMPLE,
GET_RUN_CODE_SAMPLE,
GET_RUN_TIMELINE_CODE_SAMPLE,
GET_WORKFLOWS_CODE_SAMPLE,
RETRY_RUN_WEBHOOK_CODE_SAMPLE,
RUN_TASK_CODE_SAMPLE,
@@ -844,6 +845,71 @@ async def retry_run_webhook(
await run_service.retry_run_webhook(run_id, organization_id=current_org.organization_id, api_key=x_api_key)
@base_router.get(
"/runs/{run_id}/timeline",
tags=["Agent", "Workflows"],
response_model=list[WorkflowRunTimeline],
openapi_extra={
"x-fern-sdk-method-name": "get_run_timeline",
"x-fern-examples": [{"code-samples": [{"sdk": "python", "code": GET_RUN_TIMELINE_CODE_SAMPLE}]}],
},
description="Get timeline for a run (workflow run or task_v2 run)",
summary="Get run timeline",
responses={
200: {"description": "Successfully retrieved run timeline"},
404: {"description": "Run not found"},
400: {"description": "Timeline not available for this run type"},
},
)
@base_router.get(
"/runs/{run_id}/timeline/",
response_model=list[WorkflowRunTimeline],
include_in_schema=False,
)
async def get_run_timeline(
run_id: str = Path(
..., description="The id of the workflow run or task_v2 run.", examples=["wr_123", "tsk_v2_123"]
),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRunTimeline]:
analytics.capture("skyvern-oss-run-timeline-get")
# Check if the run exists
run_response = await run_service.get_run_response(run_id, organization_id=current_org.organization_id)
if not run_response:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Run not found {run_id}",
)
# Handle workflow runs directly
if run_response.run_type == RunType.workflow_run:
return await _flatten_workflow_run_timeline(current_org.organization_id, run_id)
# Handle task_v2 runs by getting their associated workflow_run_id
if run_response.run_type == RunType.task_v2:
task_v2 = await app.DATABASE.get_task_v2(task_v2_id=run_id, organization_id=current_org.organization_id)
if not task_v2:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task v2 not found {run_id}",
)
if not task_v2.workflow_run_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Task v2 {run_id} has no associated workflow run",
)
return await _flatten_workflow_run_timeline(current_org.organization_id, task_v2.workflow_run_id)
# Timeline not available for other run types
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Timeline not available for run type {run_response.run_type}",
)
@base_router.post(
"/run/workflows/blocks",
include_in_schema=False,

View File

@@ -25,6 +25,17 @@ RETRY_RUN_WEBHOOK_CODE_SAMPLE = """from skyvern import Skyvern
skyvern = Skyvern(api_key="YOUR_API_KEY")
await skyvern.retry_run_webhook(run_id="tsk_v2_123")
"""
GET_RUN_TIMELINE_CODE_SAMPLE = """from skyvern import Skyvern
skyvern = Skyvern(api_key="YOUR_API_KEY")
# Get timeline for a workflow run
timeline = await skyvern.get_run_timeline(run_id="wr_123")
print(timeline)
# Get timeline for a task_v2 run
timeline = await skyvern.get_run_timeline(run_id="tsk_v2_123")
print(timeline)
"""
LOGIN_CODE_SAMPLE_SKYVERN = """# Login with password saved in Skyvern
from skyvern import Skyvern