TaskV2: Ask AI if relocation is needed based on current url as the first step (#3761)

This commit is contained in:
Stanislav Novosad
2025-10-22 13:58:40 -06:00
committed by GitHub
parent 66634d3aa6
commit e8472df6d1
3 changed files with 206 additions and 79 deletions

View File

@@ -5,13 +5,11 @@ from typing import Any
import httpx
import structlog
from playwright.async_api import Page
from sqlalchemy.exc import OperationalError
from skyvern.config import settings
from skyvern.exceptions import (
FailedToSendWebhook,
MissingBrowserState,
TaskTerminationError,
TaskV2NotFound,
UrlGenerationFailure,
@@ -188,40 +186,13 @@ async def initialize_task_v2(
context.run_id = context.run_id or task_v2.observer_cruise_id
context.max_screenshot_scrolls = max_screenshot_scrolling_times
thought = await app.DATABASE.create_thought(
task_v2_id=task_v2.observer_cruise_id,
organization_id=organization.organization_id,
thought_type=ThoughtType.metadata,
thought_scenario=ThoughtScenario.generate_metadata,
)
metadata_prompt = prompt_engine.load_prompt("task_v2_generate_metadata", user_goal=user_prompt, user_url=user_url)
metadata_response = await app.LLM_API_HANDLER(
prompt=metadata_prompt,
thought=thought,
prompt_name="task_v2_generate_metadata",
)
# validate
LOG.info(f"Initialized task v2 initial response: {metadata_response}")
url: str = user_url or metadata_response.get("url", "")
if not url:
raise UrlGenerationFailure()
title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE)
metadata = TaskV2Metadata(
url=url,
workflow_title=title,
)
url = metadata.url
if not url:
raise UrlGenerationFailure()
# create workflow and workflow run
max_steps_override = 10
try:
workflow_status = WorkflowStatus.published if publish_workflow else WorkflowStatus.auto_generated
new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(
organization,
metadata.workflow_title,
title=DEFAULT_WORKFLOW_TITLE, # default title is updated as the first step of the task
proxy_location=proxy_location,
status=workflow_status,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
@@ -253,27 +224,13 @@ async def initialize_task_v2(
)
raise
try:
await app.DATABASE.update_thought(
thought_id=thought.observer_thought_id,
organization_id=organization.organization_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=new_workflow.workflow_id,
workflow_permanent_id=new_workflow.workflow_permanent_id,
thought=metadata_response.get("thoughts", ""),
output=metadata.model_dump(),
)
except Exception:
LOG.warning("Failed to update thought", exc_info=True)
# update oserver cruise
# update observer cruise
try:
task_v2 = await app.DATABASE.update_task_v2(
task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=new_workflow.workflow_id,
workflow_permanent_id=new_workflow.workflow_permanent_id,
url=url,
organization_id=organization.organization_id,
)
if create_task_run:
@@ -282,8 +239,120 @@ async def initialize_task_v2(
organization_id=organization.organization_id,
run_id=task_v2.observer_cruise_id,
title=new_workflow.title,
url=url,
url_hash=generate_url_hash(url),
)
except Exception:
LOG.warning("Failed to update task 2.0", exc_info=True)
# fail the workflow run
task_v2 = await mark_task_v2_as_failed(
task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id,
failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run",
organization_id=organization.organization_id,
)
raise
return task_v2
async def initialize_task_v2_metadata(
organization: Organization,
task_v2: TaskV2,
workflow: Workflow,
workflow_run: WorkflowRun,
user_prompt: str | None,
current_browser_url: str | None,
user_url: str | None,
current_run_id: str,
) -> TaskV2:
thought = await app.DATABASE.create_thought(
task_v2_id=task_v2.observer_cruise_id,
organization_id=organization.organization_id,
thought_type=ThoughtType.metadata,
thought_scenario=ThoughtScenario.generate_metadata,
)
enable_current_url_validation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached(
"ENABLE_TASKV2_METADATA_CURRENT_URL_VALIDATION",
current_run_id,
properties={"organization_id": task_v2.organization_id},
)
if enable_current_url_validation:
metadata_prompt = prompt_engine.load_prompt(
"task_v2_generate_metadata_with_current_url",
user_goal=user_prompt,
current_browser_url=current_browser_url or "about:blank",
user_url=user_url,
)
metadata_response = await app.LLM_API_HANDLER(
prompt=metadata_prompt,
thought=thought,
prompt_name="task_v2_generate_metadata_with_current_url",
)
else:
metadata_prompt = prompt_engine.load_prompt(
"task_v2_generate_metadata",
user_goal=user_prompt,
user_url=user_url,
)
metadata_response = await app.LLM_API_HANDLER(
prompt=metadata_prompt,
thought=thought,
prompt_name="task_v2_generate_metadata",
)
# validate
LOG.info(f"Initialized task v2 initial response: {metadata_response}")
url: str = user_url or metadata_response.get("url", "")
if not url:
raise UrlGenerationFailure()
title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE)
metadata = TaskV2Metadata(
url=url,
workflow_title=title,
)
url = metadata.url
if not url:
raise UrlGenerationFailure()
try:
await app.DATABASE.update_thought(
thought_id=thought.observer_thought_id,
organization_id=organization.organization_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=workflow.workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
thought=metadata_response.get("thoughts", ""),
output=metadata.model_dump(),
)
except Exception:
LOG.warning("Failed to update thought", exc_info=True)
# update workflow & tasks with the inferred title and url
try:
await app.DATABASE.update_workflow(
workflow_id=workflow.workflow_id,
organization_id=organization.organization_id,
title=metadata.workflow_title,
)
task_v2 = await app.DATABASE.update_task_v2(
task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=workflow.workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
url=metadata.url,
organization_id=organization.organization_id,
)
task_run = await app.DATABASE.get_run(
run_id=task_v2.observer_cruise_id, organization_id=organization.organization_id
)
if task_run:
await app.DATABASE.update_task_run(
organization_id=organization.organization_id,
run_id=task_v2.observer_cruise_id,
title=metadata.workflow_title,
url=metadata.url,
url_hash=generate_url_hash(metadata.url),
)
except Exception:
LOG.warning("Failed to update task 2.0", exc_info=True)
@@ -401,7 +470,7 @@ async def run_task_v2_helper(
organization_id=organization_id,
)
return None, None, task_v2
if not task_v2.url or not task_v2.prompt:
if not task_v2.prompt:
LOG.error(
"Task v2 url or prompt not found",
task_v2_id=task_v2_id,
@@ -441,6 +510,8 @@ async def run_task_v2_helper(
)
workflow_run_id = task_v2.workflow_run_id
if not workflow_run_id:
raise ValueError("workflow_run_id is missing")
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id)
if not workflow_run:
@@ -493,12 +564,31 @@ async def run_task_v2_helper(
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id)
await _set_up_workflow_context(workflow, workflow_run_id, organization)
url = str(task_v2.url)
user_prompt = task_v2.prompt
task_history: list[dict] = []
yaml_blocks: list[BLOCK_YAML_TYPES] = []
yaml_parameters: list[PARAMETER_YAML_TYPES] = []
browser_state: BrowserState | None = None
current_url: str | None = None
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run, browser_session_id=browser_session_id
)
page = await browser_state.get_working_page()
if page:
current_url = await SkyvernFrame.get_url(page)
task_v2 = await initialize_task_v2_metadata(
task_v2=task_v2,
workflow=workflow,
workflow_run=workflow_run,
organization=organization,
user_prompt=task_v2.prompt,
current_browser_url=current_url,
user_url=task_v2.url,
current_run_id=current_run_id,
)
url = str(task_v2.url)
max_steps = int_max_steps_override or settings.MAX_STEPS_PER_TASK_V2
for i in range(DEFAULT_MAX_ITERATIONS):
@@ -536,36 +626,25 @@ async def run_task_v2_helper(
context = skyvern_context.ensure_context()
# Always ensure browser_state is available at the start of the loop
current_url: str | None = None
page: Page | None = None
fallback_url = settings.TASK_BLOCKED_SITE_FALLBACK_URL
browser_state = app.BROWSER_MANAGER.get_for_workflow_run(
workflow_run_id=workflow_run_id, parent_workflow_run_id=workflow_run.parent_workflow_run_id
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run, browser_session_id=browser_session_id
)
fallback_occurred = False
if browser_state is None:
try:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run,
if url != current_url:
if page is None:
page = await browser_state.get_or_create_page(
url=url,
browser_session_id=browser_session_id,
proxy_location=workflow_run.proxy_location,
task_id=task_v2.task_id,
workflow_run_id=workflow_run_id,
script_id=task_v2.script_id,
organization_id=organization_id,
extra_http_headers=task_v2.extra_http_headers,
)
except Exception:
LOG.warning("Failed to get or create browser state, fallback to Google", exc_info=True, url=url)
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run,
url=fallback_url,
browser_session_id=browser_session_id,
)
fallback_occurred = True
if browser_state is None:
LOG.error("Failed to create browser state even after fallback", workflow_run_id=workflow_run_id)
raise MissingBrowserState(workflow_run_id=workflow_run_id)
page = await browser_state.get_working_page()
else:
await browser_state.navigate_to_url(page, url)
page_loaded = False
if page:
@@ -590,10 +669,6 @@ async def run_task_v2_helper(
fallback_occurred = True
except Exception:
LOG.exception("Failed to load Google fallback", exc_info=True, url=url, current_url=current_url)
else:
page = await browser_state.get_working_page()
if page:
current_url = await SkyvernFrame.get_url(page)
if i == 0 and current_url != url:
if fallback_occurred: