add step count to webhooks and get run payload (#4410)

This commit is contained in:
Marc Kelechava
2026-01-07 11:41:57 -08:00
committed by GitHub
parent 3c896fad1c
commit 4401216346
9 changed files with 57 additions and 11 deletions

View File

@@ -2596,6 +2596,7 @@ class WorkflowService:
workflow_run_id: str,
organization_id: str | None = None,
include_cost: bool = False,
include_step_count: bool = False,
) -> WorkflowRunResponseBase:
workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id)
if workflow_run is None:
@@ -2607,6 +2608,7 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
organization_id=organization_id,
include_cost=include_cost,
include_step_count=include_step_count,
)
async def build_workflow_run_status_response(
@@ -2615,6 +2617,7 @@ class WorkflowService:
workflow_run_id: str,
organization_id: str | None = None,
include_cost: bool = False,
include_step_count: bool = False,
) -> WorkflowRunResponseBase:
workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id)
if workflow is None:
@@ -2713,19 +2716,23 @@ class WorkflowService:
total_steps = None
total_cost = None
if include_cost:
if include_step_count or include_cost:
workflow_run_steps = await app.DATABASE.get_steps_by_task_ids(
task_ids=[task.task_id for task in workflow_run_tasks], organization_id=organization_id
)
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id, organization_id=organization_id
)
text_prompt_blocks = [block for block in workflow_run_blocks if block.block_type == BlockType.TEXT_PROMPT]
total_steps = len(workflow_run_steps)
# TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation.
# successful steps are the ones that have a status of completed and the total count of unique step.order
successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed]
total_cost = 0.05 * (len(successful_steps) + len(text_prompt_blocks))
if include_cost:
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id, organization_id=organization_id
)
text_prompt_blocks = [
block for block in workflow_run_blocks if block.block_type == BlockType.TEXT_PROMPT
]
# TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation.
# successful steps are the ones that have a status of completed and the total count of unique step.order
successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed]
total_cost = 0.05 * (len(successful_steps) + len(text_prompt_blocks))
return WorkflowRunResponseBase(
workflow_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
@@ -2836,6 +2843,7 @@ class WorkflowService:
workflow_permanent_id=workflow_run.workflow_permanent_id,
workflow_run_id=workflow_run.workflow_run_id,
organization_id=workflow_run.organization_id,
include_step_count=True,
)
LOG.info(
"Built workflow run status response",
@@ -2884,6 +2892,7 @@ class WorkflowService:
totp_identifier=workflow_run.totp_identifier,
),
errors=workflow_run_status_response.errors,
step_count=workflow_run_status_response.total_steps,
)
payload_dict: dict = json.loads(workflow_run_status_response.model_dump_json())
workflow_run_response_dict = json.loads(workflow_run_response.model_dump_json())