Add termination-aware complete verification experiment (SKY-6884) (#3948)
This commit is contained in:
@@ -89,7 +89,12 @@ from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, Tas
|
||||
from skyvern.forge.sdk.schemas.totp_codes import OTPType
|
||||
from skyvern.forge.sdk.trace import TraceManager
|
||||
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
|
||||
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
|
||||
from skyvern.forge.sdk.workflow.models.block import (
|
||||
ActionBlock,
|
||||
BaseTaskBlock,
|
||||
FileDownloadBlock,
|
||||
ValidationBlock,
|
||||
)
|
||||
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
|
||||
from skyvern.schemas.runs import CUA_ENGINES, RunEngine
|
||||
from skyvern.schemas.steps import AgentStepOutput
|
||||
@@ -1328,6 +1333,7 @@ class ForgeAgent:
|
||||
scraped_page=scraped_page,
|
||||
task=task,
|
||||
step=step,
|
||||
task_block=task_block,
|
||||
)
|
||||
if complete_action is not None:
|
||||
LOG.info("User goal achieved, executing complete action")
|
||||
@@ -1785,7 +1791,7 @@ class ForgeAgent:
|
||||
return None
|
||||
|
||||
async def complete_verify(
|
||||
self, page: Page, scraped_page: ScrapedPage, task: Task, step: Step
|
||||
self, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, task_block: BaseTaskBlock | None = None
|
||||
) -> CompleteVerifyResult:
|
||||
LOG.info(
|
||||
"Checking if user goal is achieved after re-scraping the page",
|
||||
@@ -1803,13 +1809,47 @@ class ForgeAgent:
|
||||
if task.include_action_history_in_verification:
|
||||
actions_and_results_str = await self._get_action_results(task, current_step=step)
|
||||
|
||||
# Check if we should use the termination-aware prompt (experiment)
|
||||
# Only enabled for file download blocks
|
||||
use_termination_prompt = False
|
||||
is_file_download_block = task_block is not None and isinstance(task_block, FileDownloadBlock)
|
||||
|
||||
if is_file_download_block:
|
||||
try:
|
||||
distinct_id = task.workflow_run_id if task.workflow_run_id else task.task_id
|
||||
use_termination_prompt = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
|
||||
"USE_TERMINATION_AWARE_COMPLETE_VERIFICATION",
|
||||
distinct_id,
|
||||
properties={"organization_id": task.organization_id},
|
||||
)
|
||||
if use_termination_prompt:
|
||||
LOG.info(
|
||||
"Experiment enabled: using termination-aware complete verification prompt for file download block",
|
||||
task_id=task.task_id,
|
||||
workflow_run_id=task.workflow_run_id,
|
||||
organization_id=task.organization_id,
|
||||
block_type="file_download",
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Failed to check USE_TERMINATION_AWARE_COMPLETE_VERIFICATION experiment; using legacy behavior",
|
||||
task_id=task.task_id,
|
||||
workflow_run_id=task.workflow_run_id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
# Select the appropriate template based on experiment
|
||||
template_name = "check-user-goal-with-termination" if use_termination_prompt else "check-user-goal"
|
||||
prompt_name = "check-user-goal-with-termination" if use_termination_prompt else "check-user-goal"
|
||||
|
||||
verification_prompt = load_prompt_with_elements(
|
||||
element_tree_builder=scraped_page_refreshed,
|
||||
prompt_engine=prompt_engine,
|
||||
template_name="check-user-goal",
|
||||
template_name=template_name,
|
||||
navigation_goal=task.navigation_goal,
|
||||
navigation_payload=task.navigation_payload,
|
||||
complete_criterion=task.complete_criterion,
|
||||
terminate_criterion=task.terminate_criterion,
|
||||
action_history=actions_and_results_str,
|
||||
local_datetime=datetime.now(skyvern_context.ensure_context().tz_info).isoformat(),
|
||||
)
|
||||
@@ -1855,23 +1895,37 @@ class ForgeAgent:
|
||||
prompt=verification_prompt,
|
||||
step=step,
|
||||
screenshots=scraped_page_refreshed.screenshots,
|
||||
prompt_name="check-user-goal",
|
||||
prompt_name=prompt_name,
|
||||
)
|
||||
return CompleteVerifyResult.model_validate(verification_result)
|
||||
|
||||
async def check_user_goal_complete(
|
||||
self, page: Page, scraped_page: ScrapedPage, task: Task, step: Step
|
||||
) -> CompleteAction | None:
|
||||
self, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, task_block: BaseTaskBlock | None = None
|
||||
) -> CompleteAction | TerminateAction | None:
|
||||
try:
|
||||
verification_result = await self.complete_verify(
|
||||
page=page,
|
||||
scraped_page=scraped_page,
|
||||
task=task,
|
||||
step=step,
|
||||
task_block=task_block,
|
||||
)
|
||||
|
||||
# Check if we should terminate instead of complete
|
||||
# Note: This requires the USE_TERMINATION_AWARE_COMPLETE_VERIFICATION experiment to be enabled
|
||||
if verification_result.is_terminate:
|
||||
LOG.warning(
|
||||
"Periodic verification determined task should terminate (termination-aware experiment)",
|
||||
workflow_run_id=task.workflow_run_id,
|
||||
thoughts=verification_result.thoughts,
|
||||
status=verification_result.status if verification_result.status else "legacy",
|
||||
)
|
||||
return TerminateAction(
|
||||
reasoning=verification_result.thoughts,
|
||||
)
|
||||
|
||||
# We don't want to return a complete action if the user goal is not achieved since we're checking at every step
|
||||
if not verification_result.user_goal_achieved:
|
||||
if not verification_result.is_complete:
|
||||
return None
|
||||
|
||||
return CompleteAction(
|
||||
@@ -3160,6 +3214,7 @@ class ForgeAgent:
|
||||
scraped_page=scraped_page,
|
||||
task=task,
|
||||
step=step,
|
||||
task_block=task_block,
|
||||
),
|
||||
name=f"verify_goal_{step.step_id}",
|
||||
)
|
||||
@@ -3187,11 +3242,13 @@ class ForgeAgent:
|
||||
complete_action = None
|
||||
|
||||
if complete_action is not None:
|
||||
# Goal achieved! Cancel the pre-scraping task
|
||||
# Goal achieved or should terminate! Cancel the pre-scraping task
|
||||
is_terminate = isinstance(complete_action, TerminateAction)
|
||||
LOG.info(
|
||||
"Parallel verification: goal achieved, cancelling pre-scraping",
|
||||
"Parallel verification: goal achieved or termination required, cancelling pre-scraping",
|
||||
step_id=step.step_id,
|
||||
task_id=task.task_id,
|
||||
is_terminate=is_terminate,
|
||||
)
|
||||
pre_scrape_task.cancel()
|
||||
try:
|
||||
@@ -3201,22 +3258,73 @@ class ForgeAgent:
|
||||
except Exception:
|
||||
LOG.debug("Pre-scraping task cleanup failed", step_id=step.step_id, exc_info=True)
|
||||
|
||||
# Mark task as complete
|
||||
# Note: Step is already marked as completed by agent_step
|
||||
# We don't add the complete action to the step output since the step is already finalized
|
||||
LOG.info(
|
||||
"Parallel verification: goal achieved, marking task as complete",
|
||||
step_id=step.step_id,
|
||||
task_id=task.task_id,
|
||||
)
|
||||
last_step = await self.update_step(step, is_last=True)
|
||||
extracted_information = await self.get_extracted_information_for_task(task)
|
||||
await self.update_task(
|
||||
task,
|
||||
status=TaskStatus.completed,
|
||||
extracted_information=extracted_information,
|
||||
)
|
||||
return True, last_step, None
|
||||
working_page = page
|
||||
if working_page is None:
|
||||
working_page = await browser_state.must_get_working_page()
|
||||
|
||||
if step.output is None:
|
||||
step.output = AgentStepOutput(action_results=[], actions_and_results=[], errors=[])
|
||||
if step.output.action_results is None:
|
||||
step.output.action_results = []
|
||||
if step.output.actions_and_results is None:
|
||||
step.output.actions_and_results = []
|
||||
|
||||
persisted_action = cast(Action, complete_action)
|
||||
if isinstance(persisted_action, (CompleteAction, TerminateAction)):
|
||||
persisted_action.organization_id = task.organization_id
|
||||
persisted_action.workflow_run_id = task.workflow_run_id
|
||||
persisted_action.task_id = task.task_id
|
||||
persisted_action.step_id = step.step_id
|
||||
persisted_action.step_order = step.order
|
||||
persisted_action.action_order = len(step.output.actions_and_results)
|
||||
|
||||
action_results = await ActionHandler.handle_action(scraped_page, task, step, working_page, persisted_action)
|
||||
await self.record_artifacts_after_action(task, step, browser_state, engine)
|
||||
step.output.action_results.extend(action_results)
|
||||
step.output.actions_and_results.append((persisted_action, action_results))
|
||||
if isinstance(persisted_action, DecisiveAction) and persisted_action.errors:
|
||||
step.output.errors.extend(persisted_action.errors)
|
||||
|
||||
if is_terminate:
|
||||
# Mark task as terminated/failed
|
||||
# Note: This requires the USE_TERMINATION_AWARE_COMPLETE_VERIFICATION experiment to be enabled
|
||||
LOG.warning(
|
||||
"Parallel verification: termination required, marking task as terminated (termination-aware experiment)",
|
||||
step_id=step.step_id,
|
||||
task_id=task.task_id,
|
||||
reasoning=complete_action.reasoning,
|
||||
)
|
||||
last_step = await self.update_step(step, output=step.output, is_last=True)
|
||||
task_errors = None
|
||||
if isinstance(persisted_action, TerminateAction) and persisted_action.errors:
|
||||
task_errors = [error.model_dump() for error in persisted_action.errors]
|
||||
failure_reason = persisted_action.reasoning
|
||||
if isinstance(persisted_action, TerminateAction) and persisted_action.errors:
|
||||
failure_reason = "; ".join(error.reasoning for error in persisted_action.errors)
|
||||
await self.update_task(
|
||||
task,
|
||||
status=TaskStatus.terminated,
|
||||
failure_reason=failure_reason,
|
||||
errors=task_errors,
|
||||
)
|
||||
return True, last_step, None
|
||||
else:
|
||||
# Mark task as complete
|
||||
# Note: Step is already marked as completed by agent_step
|
||||
# We don't add the complete action to the step output since the step is already finalized
|
||||
LOG.info(
|
||||
"Parallel verification: goal achieved, marking task as complete",
|
||||
step_id=step.step_id,
|
||||
task_id=task.task_id,
|
||||
)
|
||||
last_step = await self.update_step(step, output=step.output, is_last=True)
|
||||
extracted_information = await self.get_extracted_information_for_task(task)
|
||||
await self.update_task(
|
||||
task,
|
||||
status=TaskStatus.completed,
|
||||
extracted_information=extracted_information,
|
||||
)
|
||||
return True, last_step, None
|
||||
else:
|
||||
# Goal not achieved - wait for pre-scraping to complete
|
||||
LOG.info(
|
||||
|
||||
Reference in New Issue
Block a user