set context.step_id and context.task_id at the beginning of execute_step and unset at the end + auto log step_id & task_id (#3803)

This commit is contained in:
Shuchang Zheng
2025-10-23 16:32:28 -07:00
committed by GitHub
parent 5b80614aac
commit d55b9637c4
4 changed files with 40 additions and 436 deletions

View File

@@ -232,8 +232,6 @@ class ForgeAgent:
"Created new step for workflow run",
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
step_id=step.step_id,
task_id=task.task_id,
order=step.order,
retry_index=step.retry_index,
)
@@ -304,6 +302,11 @@ class ForgeAgent:
cua_response: OpenAIResponse | None = None,
llm_caller: LLMCaller | None = None,
) -> Tuple[Step, DetailedAgentStepOutput | None, Step | None]:
# set the step_id and task_id in the context
context = skyvern_context.ensure_context()
context.step_id = step.step_id
context.task_id = task.task_id
# do not need to do complete verification when it's a CUA task
# 1. CUA executes only one action step by step -- it's pretty less likely to have a hallucination for completion or forget to return a complete
# 2. It will significantly slow down CUA tasks
@@ -324,7 +327,6 @@ class ForgeAgent:
LOG.info(
"Workflow run is canceled, stopping execution inside task",
workflow_run_id=workflow_run.workflow_run_id,
step_id=step.step_id,
)
step = await self.update_step(
step,
@@ -341,7 +343,6 @@ class ForgeAgent:
LOG.info(
"Workflow run is timed out, stopping execution inside task",
workflow_run_id=workflow_run.workflow_run_id,
step_id=step.step_id,
)
step = await self.update_step(
step,
@@ -378,8 +379,7 @@ class ForgeAgent:
)
return step, None, None
context = skyvern_context.current()
override_max_steps_per_run = context.max_steps_override if context else None
override_max_steps_per_run = context.max_steps_override or None
max_steps_per_run = (
override_max_steps_per_run
or task.max_steps_per_run
@@ -502,7 +502,6 @@ class ForgeAgent:
LOG.info(
"Detecting files are still downloading, waiting for files to be completely downloaded.",
downloading_files=downloading_files,
step_id=step.step_id,
)
try:
await wait_for_download_finished(
@@ -513,8 +512,6 @@ class ForgeAgent:
LOG.warning(
"There're several long-time downloading files, these files might be broken",
downloading_files=e.downloading_files,
task_id=task.task_id,
step_id=step.step_id,
workflow_run_id=task.workflow_run_id,
)
@@ -632,8 +629,6 @@ class ForgeAgent:
else:
LOG.error(
"Step completed but task is not completed and next step is not created.",
task_id=task.task_id,
step_id=step.step_id,
is_task_completed=is_task_completed,
maybe_last_step=maybe_last_step,
maybe_next_step=maybe_next_step,
@@ -641,8 +636,6 @@ class ForgeAgent:
else:
LOG.error(
"Unexpected step status after agent_step",
task_id=task.task_id,
step_id=step.step_id,
step_status=step.status,
)
@@ -681,8 +674,6 @@ class ForgeAgent:
else:
LOG.info(
"Step executed but continuous execution is disabled.",
task_id=task.task_id,
step_id=step.step_id,
is_cloud_env=settings.is_cloud_environment(),
execute_all_steps=settings.execute_all_steps(),
next_step_id=next_step.step_id if next_step else None,
@@ -691,18 +682,10 @@ class ForgeAgent:
return step, detailed_output, next_step
# TODO (kerem): Let's add other exceptions that we know about here as custom exceptions as well
except StepUnableToExecuteError:
LOG.error(
"Step cannot be executed. Task execution stopped",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.exception("Step cannot be executed. Task execution stopped")
raise
except TaskAlreadyTimeout:
LOG.warning(
"Task is timed out, stopping execution",
task_id=task.task_id,
step=step.step_id,
)
LOG.warning("Task is timed out, stopping execution")
await self.clean_up_task(
task=task,
last_step=step,
@@ -714,8 +697,6 @@ class ForgeAgent:
except StepTerminationError as e:
LOG.warning(
"Step cannot be executed, marking task as failed",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
is_task_marked_as_failed = await self.fail_task(task, step, e.message)
@@ -728,29 +709,20 @@ class ForgeAgent:
browser_session_id=browser_session_id,
)
else:
LOG.warning(
"Task isn't marked as failed, after step termination. NOT clean up the task",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.warning("Task isn't marked as failed, after step termination. NOT clean up the task")
return step, detailed_output, None
except FailedToSendWebhook:
LOG.exception(
"Failed to send webhook",
task_id=task.task_id,
step_id=step.step_id,
task=task,
step=step,
)
return step, detailed_output, next_step
except FailedToNavigateToUrl as e:
# Fail the task if we can't navigate to the URL and send the response
LOG.error(
LOG.exception(
"Failed to navigate to URL, marking task as failed, and sending webhook response",
task_id=task.task_id,
step_id=step.step_id,
url=e.url,
error_message=e.error_message,
)
failure_reason = f"Failed to navigate to URL. URL:{e.url}, Error:{e.error_message}"
is_task_marked_as_failed = await self.fail_task(task, step, failure_reason)
@@ -764,11 +736,7 @@ class ForgeAgent:
browser_session_id=browser_session_id,
)
else:
LOG.warning(
"Task isn't marked as failed, after navigation failure. NOT clean up the task",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.warning("Task isn't marked as failed, after navigation failure. NOT clean up the task")
return step, detailed_output, next_step
except TaskAlreadyCanceled:
LOG.info(
@@ -785,11 +753,7 @@ class ForgeAgent:
)
return step, detailed_output, None
except InvalidTaskStatusTransition:
LOG.warning(
"Invalid task status transition",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.warning("Invalid task status transition")
# TODO: shall we send task response here?
await self.clean_up_task(
task=task,
@@ -803,8 +767,6 @@ class ForgeAgent:
except (UnsupportedActionType, UnsupportedTaskType, FailedToParseActionInstruction) as e:
LOG.warning(
"unsupported task type or action type, marking the task as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -821,8 +783,6 @@ class ForgeAgent:
except ScrapingFailed as sfe:
LOG.warning(
"Scraping failed, marking the task as failed",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
@@ -841,11 +801,7 @@ class ForgeAgent:
)
return step, detailed_output, None
except Exception as e:
LOG.exception(
"Got an unexpected exception in step, marking task as failed",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.exception("Got an unexpected exception in step, marking task as failed")
failure_reason = f"Unexpected error: {str(e)}"
if isinstance(e, SkyvernException):
@@ -861,12 +817,13 @@ class ForgeAgent:
browser_session_id=browser_session_id,
)
else:
LOG.warning(
"Task isn't marked as failed, after unexpected exception. NOT clean up the task",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.warning("Task isn't marked as failed, after unexpected exception. NOT clean up the task")
return step, detailed_output, None
finally:
# remove the step_id from the context
context = skyvern_context.ensure_context()
context.step_id = None
context.task_id = None
async def fail_task(self, task: Task, step: Step | None, reason: str | None) -> bool:
try:
@@ -885,22 +842,16 @@ class ForgeAgent:
except TaskAlreadyCanceled:
LOG.info(
"Task is already canceled. Can't fail the task.",
task_id=task.task_id,
step_id=step.step_id if step else "",
)
return False
except InvalidTaskStatusTransition:
LOG.warning(
"Invalid task status transition while failing a task",
task_id=task.task_id,
step_id=step.step_id if step else "",
)
return False
except Exception:
LOG.exception(
"Failed to update status and failure reason in database. Task might going to be time_out",
task_id=task.task_id,
step_id=step.step_id if step else "",
reason=reason,
)
return True
@@ -932,8 +883,6 @@ class ForgeAgent:
try:
LOG.info(
"Starting agent step",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -1072,8 +1021,6 @@ class ForgeAgent:
if len(actions) == 0:
LOG.info(
"No actions to execute, marking step as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -1087,8 +1034,6 @@ class ForgeAgent:
# Execute the actions
LOG.info(
"Executing actions",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
actions=actions,
@@ -1139,8 +1084,6 @@ class ForgeAgent:
if context.refresh_working_page:
LOG.warning(
"Detected the signal to reload the page, going to reload and skip the rest of the actions",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
)
await browser_state.reload_page()
@@ -1153,8 +1096,6 @@ class ForgeAgent:
status=ActionStatus.completed,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=action_idx,
)
@@ -1169,8 +1110,6 @@ class ForgeAgent:
if previous_action_idx is not None:
LOG.warning(
"Duplicate action element id.",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action=action,
)
@@ -1181,8 +1120,6 @@ class ForgeAgent:
if len(previous_result) > 0 and previous_result[-1].success:
LOG.info(
"Previous action succeeded, but we'll still continue.",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
previous_action=previous_action,
previous_result=previous_result,
@@ -1190,8 +1127,6 @@ class ForgeAgent:
else:
LOG.warning(
"Previous action failed, so handle the next action.",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
previous_action=previous_action,
previous_result=previous_result,
@@ -1257,8 +1192,6 @@ class ForgeAgent:
if results and results[-1].success:
LOG.info(
"Action succeeded",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1268,8 +1201,6 @@ class ForgeAgent:
if results[-1].skip_remaining_actions:
LOG.warning(
"Going to stop executing the remaining actions",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1281,8 +1212,6 @@ class ForgeAgent:
elif results and isinstance(action, DecisiveAction):
LOG.warning(
"DecisiveAction failed, but not stopping execution and not retrying the step",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1292,8 +1221,6 @@ class ForgeAgent:
elif results and not results[-1].success and not results[-1].stop_execution_on_failure:
LOG.warning(
"Action failed, but not stopping execution",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1304,8 +1231,6 @@ class ForgeAgent:
if action_node.next is not None:
LOG.warning(
"Action failed, but have duplicated element id in the action list. Continue excuting.",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1317,8 +1242,6 @@ class ForgeAgent:
LOG.warning(
"Action failed, marking step as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_idx=action_idx,
@@ -1336,8 +1259,6 @@ class ForgeAgent:
LOG.info(
"Actions executed successfully, marking step as completed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
action_results=action_results,
@@ -1430,8 +1351,6 @@ class ForgeAgent:
except CancelledError:
LOG.exception(
"CancelledError in agent_step, marking step as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -1453,8 +1372,6 @@ class ForgeAgent:
except Exception as e:
LOG.exception(
"Unexpected exception in agent_step, marking step as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -1721,8 +1638,6 @@ class ForgeAgent:
LOG.info(
"UI-TARS action generation starts",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
)
@@ -1753,8 +1668,6 @@ class ForgeAgent:
LOG.info(
"UI-TARS action generation completed",
task_id=task.task_id,
step_id=step.step_id,
actions_count=len(actions),
)
@@ -1765,8 +1678,6 @@ class ForgeAgent:
) -> CompleteVerifyResult:
LOG.info(
"Checking if user goal is achieved after re-scraping the page",
task_id=task.task_id,
step_id=step.step_id,
workflow_run_id=task.workflow_run_id,
)
scroll = True
@@ -1829,8 +1740,6 @@ class ForgeAgent:
except Exception:
LOG.exception(
"Failed to check user goal complete, skipping",
task_id=task.task_id,
step_id=step.step_id,
workflow_run_id=task.workflow_run_id,
)
return None
@@ -1871,8 +1780,6 @@ class ForgeAgent:
except Exception:
LOG.error(
"Failed to record screenshot after action",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
@@ -1885,12 +1792,7 @@ class ForgeAgent:
data=html.encode(),
)
except Exception:
LOG.error(
"Failed to record html after action",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
LOG.exception("Failed to record html after action")
try:
video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
@@ -1903,12 +1805,7 @@ class ForgeAgent:
data=video_artifact.video_data,
)
except Exception:
LOG.error(
"Failed to record video after action",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
LOG.exception("Failed to record video after action")
async def initialize_execution_state(
self,
@@ -1967,18 +1864,10 @@ class ForgeAgent:
pass
elif scrape_type == ScrapeType.STOPLOADING:
LOG.info(
"Try to stop loading the page before scraping",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.info("Try to stop loading the page before scraping")
await browser_state.stop_page_loading()
elif scrape_type == ScrapeType.RELOAD:
LOG.info(
"Try to reload the page before scraping",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.info("Try to reload the page before scraping")
await browser_state.reload_page()
max_screenshot_number = settings.MAX_NUM_SCREENSHOTS
@@ -2030,12 +1919,7 @@ class ForgeAgent:
except (FailedToTakeScreenshot, ScrapingFailed) as e:
if idx < len(SCRAPE_TYPE_ORDER) - 1:
continue
LOG.error(
f"{e.__class__.__name__} happened in two normal attempts and reload-page retry",
task_id=task.task_id,
step_id=step.step_id,
exc_info=True,
)
LOG.exception(f"{e.__class__.__name__} happened in two normal attempts and reload-page retry")
raise e
if scraped_page is None:
@@ -2048,8 +1932,6 @@ class ForgeAgent:
)
LOG.info(
"Scraped website",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
num_elements=len(scraped_page.elements),
@@ -2408,8 +2290,6 @@ class ForgeAgent:
if action_result.success:
LOG.info(
"Extracted information for task",
task_id=task.task_id,
step_id=step.step_id,
extracted_information=action_result.data,
)
return action_result.data
@@ -2503,15 +2383,9 @@ class ForgeAgent:
except TargetClosedError:
LOG.warning(
"Failed to take screenshot before sending task response, page is closed",
task_id=task.task_id,
step_id=last_step.step_id,
)
except Exception:
LOG.exception(
"Failed to take screenshot before sending task response",
task_id=task.task_id,
step_id=last_step.step_id,
)
LOG.exception("Failed to take screenshot before sending task response")
if task.organization_id:
try:
@@ -2821,8 +2695,6 @@ class ForgeAgent:
}
LOG.debug(
"Updating step in db",
task_id=step.task_id,
step_id=step.step_id,
diff=update_comparison,
)
@@ -2831,8 +2703,6 @@ class ForgeAgent:
duration_seconds = (datetime.now(UTC) - step.created_at.replace(tzinfo=UTC)).total_seconds()
LOG.info(
"Step duration metrics",
task_id=step.task_id,
step_id=step.step_id,
duration_seconds=duration_seconds,
step_status=status,
organization_id=step.organization_id,
@@ -2910,8 +2780,6 @@ class ForgeAgent:
if step.retry_index >= max_retries_per_step:
LOG.warning(
"Step failed after max retries, marking task as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
max_retries=settings.MAX_RETRIES_PER_STEP,
@@ -2941,8 +2809,6 @@ class ForgeAgent:
else:
LOG.warning(
"Step failed, retrying",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -3016,7 +2882,7 @@ class ForgeAgent:
)
return MaxStepsReasonResponse.model_validate(json_response)
except Exception:
LOG.warning("Failed to summary the failure reason", task_id=task.task_id, step_id=step.step_id)
LOG.warning("Failed to summary the failure reason")
if steps_results:
last_step_result = steps_results[-1]
return MaxStepsReasonResponse(
@@ -3086,11 +2952,7 @@ class ForgeAgent:
)
return json_response.get("reasoning", "")
except Exception:
LOG.warning(
"Failed to summarize the failure reason for max retries",
task_id=task.task_id,
step_id=step.step_id,
)
LOG.warning("Failed to summarize the failure reason for max retries")
if steps_results:
last_step_result = steps_results[-1]
return f"Retry Step {last_step_result['order']}: {last_step_result['actions_result']}"
@@ -3107,8 +2969,6 @@ class ForgeAgent:
if step.is_goal_achieved():
LOG.info(
"Step completed and goal achieved, marking task as completed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
output=step.output,
@@ -3124,8 +2984,6 @@ class ForgeAgent:
if step.is_terminated():
LOG.info(
"Step completed and terminated by the agent, marking task as terminated",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
output=step.output,
@@ -3148,8 +3006,6 @@ class ForgeAgent:
if isinstance(task_block, ActionBlock) and step.is_success():
LOG.info(
"Step completed for the action block, marking task as completed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
output=step.output,
@@ -3164,8 +3020,6 @@ class ForgeAgent:
if step.order + 1 >= max_steps_per_run:
LOG.info(
"Step completed but max steps reached, marking task as failed",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
max_steps=max_steps_per_run,
@@ -3193,8 +3047,6 @@ class ForgeAgent:
else:
LOG.info(
"Step completed, creating next step",
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
step_retry=step.retry_index,
)
@@ -3311,7 +3163,7 @@ class ForgeAgent:
and (task.totp_verification_url or task.totp_identifier)
and task.organization_id
):
LOG.info("Need verification code", step_id=step.step_id)
LOG.info("Need verification code")
workflow_id = workflow_permanent_id = None
if task.workflow_run_id:
workflow_run = await app.DATABASE.get_workflow_run(task.workflow_run_id)