From 44012163468e6028e9af9558ba4b5f19f5764b94 Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Wed, 7 Jan 2026 11:41:57 -0800 Subject: [PATCH] add step count to webhooks and get run payload (#4410) --- skyvern/forge/agent.py | 3 +++ skyvern/forge/sdk/db/agent_db.py | 16 ++++++++++++++++ skyvern/forge/sdk/schemas/tasks.py | 3 +++ skyvern/forge/sdk/workflow/service.py | 27 ++++++++++++++++++--------- skyvern/schemas/runs.py | 4 ++++ skyvern/services/run_service.py | 1 + skyvern/services/task_v1_service.py | 11 +++++++++-- skyvern/services/task_v2_service.py | 1 + skyvern/services/workflow_service.py | 2 ++ 9 files changed, 57 insertions(+), 11 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 39038e82..76c34126 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -3406,11 +3406,13 @@ class ForgeAgent: last_step: Step | None = None, failure_reason: str | None = None, need_browser_log: bool = False, + step_count: int | None = None, ) -> TaskResponse: # no last step means the task didn't start, so we don't have any other artifacts if last_step is None: return task.to_task_response( failure_reason=failure_reason, + step_count=step_count, ) screenshot_url = None @@ -3512,6 +3514,7 @@ class ForgeAgent: browser_console_log_url=browser_console_log_url, downloaded_files=downloaded_files, failure_reason=failure_reason, + step_count=step_count, ) async def cleanup_browser_and_create_artifacts( diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 9cffa6dd..464c5bd6 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -513,6 +513,22 @@ class AgentDB(BaseAlchemyDB): LOG.error("UnexpectedError", exc_info=True) raise + async def get_task_step_count(self, task_id: str, organization_id: str | None = None) -> int: + try: + async with self.Session() as session: + result = await session.scalar( + select(func.count(StepModel.step_id)) + .filter_by(task_id=task_id) + .filter_by(organization_id=organization_id) + ) + return result or 0 + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + async def get_task_actions(self, task_id: str, organization_id: str | None = None) -> list[Action]: try: async with self.Session() as session: diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index 2151881c..0c42dff1 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -339,6 +339,7 @@ class Task(TaskBase): browser_console_log_url: str | None = None, downloaded_files: list[FileInfo] | None = None, failure_reason: str | None = None, + step_count: int | None = None, ) -> TaskResponse: return TaskResponse( request=self, @@ -362,6 +363,7 @@ class Task(TaskBase): max_steps_per_run=self.max_steps_per_run, workflow_run_id=self.workflow_run_id, max_screenshot_scrolls=self.max_screenshot_scrolls, + step_count=step_count, ) @@ -387,6 +389,7 @@ class TaskResponse(BaseModel): started_at: datetime | None = None finished_at: datetime | None = None max_screenshot_scrolls: int | None = None + step_count: int | None = None class TaskOutput(BaseModel): diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 3e93138a..c8754c71 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -2596,6 +2596,7 @@ class WorkflowService: workflow_run_id: str, organization_id: str | None = None, include_cost: bool = False, + include_step_count: bool = False, ) -> WorkflowRunResponseBase: workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) if workflow_run is None: @@ -2607,6 +2608,7 @@ class WorkflowService: workflow_run_id=workflow_run_id, organization_id=organization_id, include_cost=include_cost, + include_step_count=include_step_count, ) async def build_workflow_run_status_response( @@ -2615,6 +2617,7 @@ class WorkflowService: workflow_run_id: str, organization_id: str | None = None, include_cost: bool = False, + include_step_count: bool = False, ) -> WorkflowRunResponseBase: workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id) if workflow is None: @@ -2713,19 +2716,23 @@ class WorkflowService: total_steps = None total_cost = None - if include_cost: + if include_step_count or include_cost: workflow_run_steps = await app.DATABASE.get_steps_by_task_ids( task_ids=[task.task_id for task in workflow_run_tasks], organization_id=organization_id ) - workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( - workflow_run_id=workflow_run_id, organization_id=organization_id - ) - text_prompt_blocks = [block for block in workflow_run_blocks if block.block_type == BlockType.TEXT_PROMPT] total_steps = len(workflow_run_steps) - # TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation. - # successful steps are the ones that have a status of completed and the total count of unique step.order - successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed] - total_cost = 0.05 * (len(successful_steps) + len(text_prompt_blocks)) + + if include_cost: + workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( + workflow_run_id=workflow_run_id, organization_id=organization_id + ) + text_prompt_blocks = [ + block for block in workflow_run_blocks if block.block_type == BlockType.TEXT_PROMPT + ] + # TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation. + # successful steps are the ones that have a status of completed and the total count of unique step.order + successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed] + total_cost = 0.05 * (len(successful_steps) + len(text_prompt_blocks)) return WorkflowRunResponseBase( workflow_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -2836,6 +2843,7 @@ class WorkflowService: workflow_permanent_id=workflow_run.workflow_permanent_id, workflow_run_id=workflow_run.workflow_run_id, organization_id=workflow_run.organization_id, + include_step_count=True, ) LOG.info( "Built workflow run status response", @@ -2884,6 +2892,7 @@ class WorkflowService: totp_identifier=workflow_run.totp_identifier, ), errors=workflow_run_status_response.errors, + step_count=workflow_run_status_response.total_steps, ) payload_dict: dict = json.loads(workflow_run_status_response.model_dump_json()) workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json()) diff --git a/skyvern/schemas/runs.py b/skyvern/schemas/runs.py index 9073afe1..86f39d13 100644 --- a/skyvern/schemas/runs.py +++ b/skyvern/schemas/runs.py @@ -607,6 +607,10 @@ class BaseRunResponse(BaseModel): default=None, description="The errors for the run", ) + step_count: int | None = Field( + default=None, + description="Total number of steps executed in this run", + ) class TaskRunResponse(BaseRunResponse): diff --git a/skyvern/services/run_service.py b/skyvern/services/run_service.py index 0e83d8af..4938268a 100644 --- a/skyvern/services/run_service.py +++ b/skyvern/services/run_service.py @@ -70,6 +70,7 @@ async def get_run_response(run_id: str, organization_id: str | None = None) -> R max_screenshot_scrolls=task_v1_response.request.max_screenshot_scrolls, ), errors=task_v1_response.errors, + step_count=task_v1_response.step_count, ) elif run.task_run_type == RunType.task_v2: task_v2 = await app.DATABASE.get_task_v2(run.run_id, organization_id=organization_id) diff --git a/skyvern/services/task_v1_service.py b/skyvern/services/task_v1_service.py index 15efe093..929b654e 100644 --- a/skyvern/services/task_v1_service.py +++ b/skyvern/services/task_v1_service.py @@ -126,10 +126,13 @@ async def get_task_v1_response(task_id: str, organization_id: str | None = None) if not task_obj: raise TaskNotFound(task_id=task_id) + # get step count efficiently via COUNT query + step_count = await app.DATABASE.get_task_step_count(task_id, organization_id) + # get latest step latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=organization_id) if not latest_step: - return await app.agent.build_task_response(task=task_obj) + return await app.agent.build_task_response(task=task_obj, step_count=step_count) failure_reason: str | None = None if task_obj.status == TaskStatus.failed and (latest_step.output or task_obj.failure_reason): @@ -148,5 +151,9 @@ async def get_task_v1_response(task_id: str, organization_id: str | None = None) if len(action_results_string) > 0: failure_reason += "(Exceptions: " + str(action_results_string) + ")" return await app.agent.build_task_response( - task=task_obj, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True + task=task_obj, + last_step=latest_step, + failure_reason=failure_reason, + need_browser_log=True, + step_count=step_count, ) diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 0a83f315..53fa44d9 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -1799,6 +1799,7 @@ async def build_task_v2_run_response(task_v2: TaskV2) -> TaskRunResponse: error_code_mapping=task_v2.error_code_mapping, ), errors=workflow_run_resp.errors if workflow_run_resp else None, + step_count=workflow_run_resp.step_count if workflow_run_resp else None, ) diff --git a/skyvern/services/workflow_service.py b/skyvern/services/workflow_service.py index a56a146b..dafc460a 100644 --- a/skyvern/services/workflow_service.py +++ b/skyvern/services/workflow_service.py @@ -113,6 +113,7 @@ async def get_workflow_run_response( workflow_run_resp = await app.WORKFLOW_SERVICE.build_workflow_run_status_response_by_workflow_id( workflow_run_id=workflow_run.workflow_run_id, organization_id=organization_id, + include_step_count=True, ) app_url = f"{settings.SKYVERN_APP_URL.rstrip('/')}/runs/{workflow_run.workflow_run_id}" return WorkflowRunResponse( @@ -145,4 +146,5 @@ async def get_workflow_run_response( # TODO: add browser session id ), errors=workflow_run_resp.errors, + step_count=workflow_run_resp.total_steps, )