diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 1629f5c2..ef3980a5 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -2632,12 +2632,15 @@ class ForgeAgent: # Track task duration when task is completed, failed, or terminated if status in [TaskStatus.completed, TaskStatus.failed, TaskStatus.terminated]: - duration_seconds = (datetime.now(UTC) - task.created_at.replace(tzinfo=UTC)).total_seconds() + start_time = task.started_at.replace(tzinfo=UTC) if task.started_at else task.created_at.replace(tzinfo=UTC) + queued_seconds = (start_time - task.created_at.replace(tzinfo=UTC)).total_seconds() + duration_seconds = (datetime.now(UTC) - start_time).total_seconds() LOG.info( "Task duration metrics", task_id=task.task_id, workflow_run_id=task.workflow_run_id, duration_seconds=duration_seconds, + queued_seconds=queued_seconds, task_status=status, organization_id=task.organization_id, ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 8699cef8..9da55b7a 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -354,6 +354,7 @@ class WorkflowService: # Check if there's a related workflow script that should be used instead workflow_script, _ = await self._get_workflow_script(workflow, workflow_run, block_labels) + is_script = workflow_script is not None if workflow_script is not None: LOG.info( "Found related workflow script, running script instead of workflow", @@ -393,7 +394,9 @@ class WorkflowService: WorkflowRunStatus.terminated, WorkflowRunStatus.timed_out, ): - workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) + workflow_run = await self.mark_workflow_run_as_completed( + workflow_run_id=workflow_run_id, is_script=is_script + ) await self.generate_script_if_needed( workflow=workflow, workflow_run=workflow_run, @@ -413,18 +416,6 @@ class WorkflowService: close_browser_on_completion=close_browser_on_completion, ) - # Track workflow run duration when completed - duration_seconds = (datetime.now(UTC) - workflow_run.created_at.replace(tzinfo=UTC)).total_seconds() - LOG.info( - "Workflow run duration metrics", - workflow_run_id=workflow_run_id, - workflow_id=workflow_run.workflow_id, - duration_seconds=duration_seconds, - workflow_run_status=workflow_run.status, - organization_id=organization_id, - is_script_run=workflow_script is not None, - ) - return workflow_run async def _execute_workflow_blocks( @@ -963,77 +954,119 @@ class WorkflowService: browser_address=workflow_request.browser_address, ) - async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> WorkflowRun: + async def _update_workflow_run_status( + self, + workflow_run_id: str, + status: WorkflowRunStatus, + failure_reason: str | None = None, + is_script: bool = False, + ) -> WorkflowRun: + workflow_run = await app.DATABASE.update_workflow_run( + workflow_run_id=workflow_run_id, + status=status, + failure_reason=failure_reason, + ) + if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]: + start_time = ( + workflow_run.started_at.replace(tzinfo=UTC) + if workflow_run.started_at + else workflow_run.created_at.replace(tzinfo=UTC) + ) + queued_seconds = (start_time - workflow_run.created_at.replace(tzinfo=UTC)).total_seconds() + duration_seconds = (datetime.now(UTC) - start_time).total_seconds() + LOG.info( + "Workflow run duration metrics", + workflow_run_id=workflow_run_id, + workflow_id=workflow_run.workflow_id, + queued_seconds=queued_seconds, + duration_seconds=duration_seconds, + workflow_run_status=workflow_run.status, + organization_id=workflow_run.organization_id, + is_script_run=is_script, + ) + return workflow_run + + async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, workflow_status="completed", ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, + is_script=is_script, ) - async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun: + async def mark_workflow_run_as_failed( + self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False + ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, workflow_status="failed", failure_reason=failure_reason, ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, failure_reason=failure_reason, + is_script=is_script, ) - async def mark_workflow_run_as_running(self, workflow_run_id: str) -> WorkflowRun: + async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, workflow_status="running", ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.running, + is_script=is_script, ) - async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun: + async def mark_workflow_run_as_terminated( + self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False + ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", workflow_run_id=workflow_run_id, workflow_status="terminated", failure_reason=failure_reason, ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, failure_reason=failure_reason, + is_script=is_script, ) - async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> WorkflowRun: + async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as canceled", workflow_run_id=workflow_run_id, workflow_status="canceled", ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.canceled, + is_script=is_script, ) async def mark_workflow_run_as_timed_out( - self, workflow_run_id: str, failure_reason: str | None = None + self, workflow_run_id: str, failure_reason: str | None = None, is_script: bool = False ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as timed out", workflow_run_id=workflow_run_id, workflow_status="timed_out", ) - return await app.DATABASE.update_workflow_run( + return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.timed_out, failure_reason=failure_reason, + is_script=is_script, ) async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun: @@ -2504,7 +2537,9 @@ class WorkflowService: ) # Mark workflow run as completed - workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) + workflow_run = await self.mark_workflow_run_as_completed( + workflow_run_id=workflow_run.workflow_run_id, is_script=True + ) LOG.info( "Successfully executed workflow script", @@ -2526,7 +2561,7 @@ class WorkflowService: # Mark workflow run as failed failure_reason = f"Failed to execute workflow script: {str(e)}" workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script=True ) return workflow_run diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 8de87f3c..d6d04bc0 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -1409,13 +1409,41 @@ async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> Ta return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) +async def _update_task_v2_status( + task_v2_id: str, + status: TaskV2Status, + organization_id: str | None = None, + summary: str | None = None, + output: dict[str, Any] | None = None, +) -> TaskV2: + task_v2 = await app.DATABASE.update_task_v2( + task_v2_id, organization_id=organization_id, status=status, summary=summary, output=output + ) + if status in [TaskV2Status.completed, TaskV2Status.failed, TaskV2Status.terminated]: + start_time = ( + task_v2.started_at.replace(tzinfo=UTC) if task_v2.started_at else task_v2.created_at.replace(tzinfo=UTC) + ) + queued_seconds = (start_time - task_v2.created_at.replace(tzinfo=UTC)).total_seconds() + duration_seconds = (datetime.now(UTC) - start_time).total_seconds() + LOG.info( + "Task v2 duration metrics", + task_v2_id=task_v2_id, + workflow_run_id=task_v2.workflow_run_id, + duration_seconds=duration_seconds, + queued_seconds=queued_seconds, + task_v2_status=task_v2.status, + organization_id=organization_id, + ) + return task_v2 + + async def mark_task_v2_as_failed( task_v2_id: str, workflow_run_id: str | None = None, failure_reason: str | None = None, organization_id: str | None = None, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status.failed, @@ -1435,7 +1463,7 @@ async def mark_task_v2_as_completed( summary: str | None = None, output: dict[str, Any] | None = None, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status.completed, @@ -1445,17 +1473,6 @@ async def mark_task_v2_as_completed( if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) - # Track task v2 duration when completed - duration_seconds = (datetime.now(UTC) - task_v2.created_at.replace(tzinfo=UTC)).total_seconds() - LOG.info( - "Task v2 duration metrics", - task_v2_id=task_v2_id, - workflow_run_id=workflow_run_id, - duration_seconds=duration_seconds, - task_v2_status=TaskV2Status.completed, - organization_id=organization_id, - ) - await send_task_v2_webhook(task_v2) return task_v2 @@ -1465,7 +1482,7 @@ async def mark_task_v2_as_canceled( workflow_run_id: str | None = None, organization_id: str | None = None, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status.canceled, @@ -1482,7 +1499,7 @@ async def mark_task_v2_as_terminated( organization_id: str | None = None, failure_reason: str | None = None, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status.terminated, @@ -1499,7 +1516,7 @@ async def mark_task_v2_as_timed_out( organization_id: str | None = None, failure_reason: str | None = None, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status.timed_out, @@ -1515,7 +1532,7 @@ async def update_task_v2_status_to_workflow_run_status( workflow_run_status: WorkflowRunStatus, organization_id: str, ) -> TaskV2: - task_v2 = await app.DATABASE.update_task_v2( + task_v2 = await _update_task_v2_status( task_v2_id, organization_id=organization_id, status=TaskV2Status(workflow_run_status),