fix run duration calculation (#3427)
This commit is contained in:
@@ -2632,12 +2632,15 @@ class ForgeAgent:
|
|||||||
|
|
||||||
# Track task duration when task is completed, failed, or terminated
|
# Track task duration when task is completed, failed, or terminated
|
||||||
if status in [TaskStatus.completed, TaskStatus.failed, TaskStatus.terminated]:
|
if status in [TaskStatus.completed, TaskStatus.failed, TaskStatus.terminated]:
|
||||||
duration_seconds = (datetime.now(UTC) - task.created_at.replace(tzinfo=UTC)).total_seconds()
|
start_time = task.started_at.replace(tzinfo=UTC) if task.started_at else task.created_at.replace(tzinfo=UTC)
|
||||||
|
queued_seconds = (start_time - task.created_at.replace(tzinfo=UTC)).total_seconds()
|
||||||
|
duration_seconds = (datetime.now(UTC) - start_time).total_seconds()
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Task duration metrics",
|
"Task duration metrics",
|
||||||
task_id=task.task_id,
|
task_id=task.task_id,
|
||||||
workflow_run_id=task.workflow_run_id,
|
workflow_run_id=task.workflow_run_id,
|
||||||
duration_seconds=duration_seconds,
|
duration_seconds=duration_seconds,
|
||||||
|
queued_seconds=queued_seconds,
|
||||||
task_status=status,
|
task_status=status,
|
||||||
organization_id=task.organization_id,
|
organization_id=task.organization_id,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -354,6 +354,7 @@ class WorkflowService:
|
|||||||
|
|
||||||
# Check if there's a related workflow script that should be used instead
|
# Check if there's a related workflow script that should be used instead
|
||||||
workflow_script, _ = await self._get_workflow_script(workflow, workflow_run, block_labels)
|
workflow_script, _ = await self._get_workflow_script(workflow, workflow_run, block_labels)
|
||||||
|
is_script = workflow_script is not None
|
||||||
if workflow_script is not None:
|
if workflow_script is not None:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Found related workflow script, running script instead of workflow",
|
"Found related workflow script, running script instead of workflow",
|
||||||
@@ -393,7 +394,9 @@ class WorkflowService:
|
|||||||
WorkflowRunStatus.terminated,
|
WorkflowRunStatus.terminated,
|
||||||
WorkflowRunStatus.timed_out,
|
WorkflowRunStatus.timed_out,
|
||||||
):
|
):
|
||||||
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
|
workflow_run = await self.mark_workflow_run_as_completed(
|
||||||
|
workflow_run_id=workflow_run_id, is_script=is_script
|
||||||
|
)
|
||||||
await self.generate_script_if_needed(
|
await self.generate_script_if_needed(
|
||||||
workflow=workflow,
|
workflow=workflow,
|
||||||
workflow_run=workflow_run,
|
workflow_run=workflow_run,
|
||||||
@@ -413,18 +416,6 @@ class WorkflowService:
|
|||||||
close_browser_on_completion=close_browser_on_completion,
|
close_browser_on_completion=close_browser_on_completion,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Track workflow run duration when completed
|
|
||||||
duration_seconds = (datetime.now(UTC) - workflow_run.created_at.replace(tzinfo=UTC)).total_seconds()
|
|
||||||
LOG.info(
|
|
||||||
"Workflow run duration metrics",
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
workflow_id=workflow_run.workflow_id,
|
|
||||||
duration_seconds=duration_seconds,
|
|
||||||
workflow_run_status=workflow_run.status,
|
|
||||||
organization_id=organization_id,
|
|
||||||
is_script_run=workflow_script is not None,
|
|
||||||
)
|
|
||||||
|
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|
||||||
async def _execute_workflow_blocks(
|
async def _execute_workflow_blocks(
|
||||||
@@ -963,77 +954,119 @@ class WorkflowService:
|
|||||||
browser_address=workflow_request.browser_address,
|
browser_address=workflow_request.browser_address,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> WorkflowRun:
|
async def _update_workflow_run_status(
|
||||||
|
self,
|
||||||
|
workflow_run_id: str,
|
||||||
|
status: WorkflowRunStatus,
|
||||||
|
failure_reason: str | None = None,
|
||||||
|
is_script: bool = False,
|
||||||
|
) -> WorkflowRun:
|
||||||
|
workflow_run = await app.DATABASE.update_workflow_run(
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
status=status,
|
||||||
|
failure_reason=failure_reason,
|
||||||
|
)
|
||||||
|
if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]:
|
||||||
|
start_time = (
|
||||||
|
workflow_run.started_at.replace(tzinfo=UTC)
|
||||||
|
if workflow_run.started_at
|
||||||
|
else workflow_run.created_at.replace(tzinfo=UTC)
|
||||||
|
)
|
||||||
|
queued_seconds = (start_time - workflow_run.created_at.replace(tzinfo=UTC)).total_seconds()
|
||||||
|
duration_seconds = (datetime.now(UTC) - start_time).total_seconds()
|
||||||
|
LOG.info(
|
||||||
|
"Workflow run duration metrics",
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
workflow_id=workflow_run.workflow_id,
|
||||||
|
queued_seconds=queued_seconds,
|
||||||
|
duration_seconds=duration_seconds,
|
||||||
|
workflow_run_status=workflow_run.status,
|
||||||
|
organization_id=workflow_run.organization_id,
|
||||||
|
is_script_run=is_script,
|
||||||
|
)
|
||||||
|
return workflow_run
|
||||||
|
|
||||||
|
async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as completed",
|
f"Marking workflow run {workflow_run_id} as completed",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="completed",
|
workflow_status="completed",
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.completed,
|
status=WorkflowRunStatus.completed,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun:
|
async def mark_workflow_run_as_failed(
|
||||||
|
self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False
|
||||||
|
) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as failed",
|
f"Marking workflow run {workflow_run_id} as failed",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="failed",
|
workflow_status="failed",
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.failed,
|
status=WorkflowRunStatus.failed,
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_running(self, workflow_run_id: str) -> WorkflowRun:
|
async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as running",
|
f"Marking workflow run {workflow_run_id} as running",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="running",
|
workflow_status="running",
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.running,
|
status=WorkflowRunStatus.running,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun:
|
async def mark_workflow_run_as_terminated(
|
||||||
|
self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False
|
||||||
|
) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as terminated",
|
f"Marking workflow run {workflow_run_id} as terminated",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="terminated",
|
workflow_status="terminated",
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.terminated,
|
status=WorkflowRunStatus.terminated,
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> WorkflowRun:
|
async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as canceled",
|
f"Marking workflow run {workflow_run_id} as canceled",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="canceled",
|
workflow_status="canceled",
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.canceled,
|
status=WorkflowRunStatus.canceled,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def mark_workflow_run_as_timed_out(
|
async def mark_workflow_run_as_timed_out(
|
||||||
self, workflow_run_id: str, failure_reason: str | None = None
|
self, workflow_run_id: str, failure_reason: str | None = None, is_script: bool = False
|
||||||
) -> WorkflowRun:
|
) -> WorkflowRun:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
f"Marking workflow run {workflow_run_id} as timed out",
|
f"Marking workflow run {workflow_run_id} as timed out",
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
workflow_status="timed_out",
|
workflow_status="timed_out",
|
||||||
)
|
)
|
||||||
return await app.DATABASE.update_workflow_run(
|
return await self._update_workflow_run_status(
|
||||||
workflow_run_id=workflow_run_id,
|
workflow_run_id=workflow_run_id,
|
||||||
status=WorkflowRunStatus.timed_out,
|
status=WorkflowRunStatus.timed_out,
|
||||||
failure_reason=failure_reason,
|
failure_reason=failure_reason,
|
||||||
|
is_script=is_script,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun:
|
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun:
|
||||||
@@ -2504,7 +2537,9 @@ class WorkflowService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Mark workflow run as completed
|
# Mark workflow run as completed
|
||||||
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
|
workflow_run = await self.mark_workflow_run_as_completed(
|
||||||
|
workflow_run_id=workflow_run.workflow_run_id, is_script=True
|
||||||
|
)
|
||||||
|
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Successfully executed workflow script",
|
"Successfully executed workflow script",
|
||||||
@@ -2526,7 +2561,7 @@ class WorkflowService:
|
|||||||
# Mark workflow run as failed
|
# Mark workflow run as failed
|
||||||
failure_reason = f"Failed to execute workflow script: {str(e)}"
|
failure_reason = f"Failed to execute workflow script: {str(e)}"
|
||||||
workflow_run = await self.mark_workflow_run_as_failed(
|
workflow_run = await self.mark_workflow_run_as_failed(
|
||||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script=True
|
||||||
)
|
)
|
||||||
|
|
||||||
return workflow_run
|
return workflow_run
|
||||||
|
|||||||
@@ -1409,13 +1409,41 @@ async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> Ta
|
|||||||
return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
|
return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
|
||||||
|
|
||||||
|
|
||||||
|
async def _update_task_v2_status(
|
||||||
|
task_v2_id: str,
|
||||||
|
status: TaskV2Status,
|
||||||
|
organization_id: str | None = None,
|
||||||
|
summary: str | None = None,
|
||||||
|
output: dict[str, Any] | None = None,
|
||||||
|
) -> TaskV2:
|
||||||
|
task_v2 = await app.DATABASE.update_task_v2(
|
||||||
|
task_v2_id, organization_id=organization_id, status=status, summary=summary, output=output
|
||||||
|
)
|
||||||
|
if status in [TaskV2Status.completed, TaskV2Status.failed, TaskV2Status.terminated]:
|
||||||
|
start_time = (
|
||||||
|
task_v2.started_at.replace(tzinfo=UTC) if task_v2.started_at else task_v2.created_at.replace(tzinfo=UTC)
|
||||||
|
)
|
||||||
|
queued_seconds = (start_time - task_v2.created_at.replace(tzinfo=UTC)).total_seconds()
|
||||||
|
duration_seconds = (datetime.now(UTC) - start_time).total_seconds()
|
||||||
|
LOG.info(
|
||||||
|
"Task v2 duration metrics",
|
||||||
|
task_v2_id=task_v2_id,
|
||||||
|
workflow_run_id=task_v2.workflow_run_id,
|
||||||
|
duration_seconds=duration_seconds,
|
||||||
|
queued_seconds=queued_seconds,
|
||||||
|
task_v2_status=task_v2.status,
|
||||||
|
organization_id=organization_id,
|
||||||
|
)
|
||||||
|
return task_v2
|
||||||
|
|
||||||
|
|
||||||
async def mark_task_v2_as_failed(
|
async def mark_task_v2_as_failed(
|
||||||
task_v2_id: str,
|
task_v2_id: str,
|
||||||
workflow_run_id: str | None = None,
|
workflow_run_id: str | None = None,
|
||||||
failure_reason: str | None = None,
|
failure_reason: str | None = None,
|
||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status.failed,
|
status=TaskV2Status.failed,
|
||||||
@@ -1435,7 +1463,7 @@ async def mark_task_v2_as_completed(
|
|||||||
summary: str | None = None,
|
summary: str | None = None,
|
||||||
output: dict[str, Any] | None = None,
|
output: dict[str, Any] | None = None,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status.completed,
|
status=TaskV2Status.completed,
|
||||||
@@ -1445,17 +1473,6 @@ async def mark_task_v2_as_completed(
|
|||||||
if workflow_run_id:
|
if workflow_run_id:
|
||||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
||||||
|
|
||||||
# Track task v2 duration when completed
|
|
||||||
duration_seconds = (datetime.now(UTC) - task_v2.created_at.replace(tzinfo=UTC)).total_seconds()
|
|
||||||
LOG.info(
|
|
||||||
"Task v2 duration metrics",
|
|
||||||
task_v2_id=task_v2_id,
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
duration_seconds=duration_seconds,
|
|
||||||
task_v2_status=TaskV2Status.completed,
|
|
||||||
organization_id=organization_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
await send_task_v2_webhook(task_v2)
|
await send_task_v2_webhook(task_v2)
|
||||||
return task_v2
|
return task_v2
|
||||||
|
|
||||||
@@ -1465,7 +1482,7 @@ async def mark_task_v2_as_canceled(
|
|||||||
workflow_run_id: str | None = None,
|
workflow_run_id: str | None = None,
|
||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status.canceled,
|
status=TaskV2Status.canceled,
|
||||||
@@ -1482,7 +1499,7 @@ async def mark_task_v2_as_terminated(
|
|||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
failure_reason: str | None = None,
|
failure_reason: str | None = None,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status.terminated,
|
status=TaskV2Status.terminated,
|
||||||
@@ -1499,7 +1516,7 @@ async def mark_task_v2_as_timed_out(
|
|||||||
organization_id: str | None = None,
|
organization_id: str | None = None,
|
||||||
failure_reason: str | None = None,
|
failure_reason: str | None = None,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status.timed_out,
|
status=TaskV2Status.timed_out,
|
||||||
@@ -1515,7 +1532,7 @@ async def update_task_v2_status_to_workflow_run_status(
|
|||||||
workflow_run_status: WorkflowRunStatus,
|
workflow_run_status: WorkflowRunStatus,
|
||||||
organization_id: str,
|
organization_id: str,
|
||||||
) -> TaskV2:
|
) -> TaskV2:
|
||||||
task_v2 = await app.DATABASE.update_task_v2(
|
task_v2 = await _update_task_v2_status(
|
||||||
task_v2_id,
|
task_v2_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
status=TaskV2Status(workflow_run_status),
|
status=TaskV2Status(workflow_run_status),
|
||||||
|
|||||||
Reference in New Issue
Block a user