diff --git a/skyvern/forge/prompts/skyvern/task_v2_generate_metadata_with_current_url.j2 b/skyvern/forge/prompts/skyvern/task_v2_generate_metadata_with_current_url.j2 new file mode 100644 index 00000000..7b1d3f6d --- /dev/null +++ b/skyvern/forge/prompts/skyvern/task_v2_generate_metadata_with_current_url.j2 @@ -0,0 +1,27 @@ +You're to assist the user to achieve the user goal in the browser. Given the user input and the current url of the browser, what is the url to type into the browser? Also come up with a proper title for this goal to achieve. + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "thoughts": str, // Think step by step. What is the current browser url? Does it already achieve the user goal? Has the user specify a url to go? If yes, what is the complete url user specified? If not, to achieve what the user wants to do, what is most likely website url for achieving the goal? + "url": str, // The initial url to type into the browser. If the user specified one, use exactly that url. If the "current browser url" achieves the user goal, use exactly the "current browser url". + "title": str // A descriptive and informative title for the goal. Use no more than 5 words +} + +User goal: +``` +{{ user_goal }} +```{% if user_url %} + +Starting url provided by user: +``` +{{ user_url }} +```{% endif %} + +{% if current_browser_url %} +Current browser url: +``` +{{ current_browser_url }} +``` +{% endif %} diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 60a937e4..4069ae79 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -3786,6 +3786,31 @@ class AgentDB: await session.refresh(task_run) return Run.model_validate(task_run) + async def update_task_run( + self, + organization_id: str, + run_id: str, + title: str | None = None, + url: str | None = None, + url_hash: str | None = None, + ) -> None: + async with self.Session() as session: + task_run = ( + await session.scalars( + select(TaskRunModel).filter_by(run_id=run_id).filter_by(organization_id=organization_id) + ) + ).first() + if not task_run: + raise NotFoundError(f"TaskRun {run_id} not found") + + if title: + task_run.title = title + if url: + task_run.url = url + if url_hash: + task_run.url_hash = url_hash + await session.commit() + async def create_credential( self, organization_id: str, diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index b9238828..eb521898 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -5,13 +5,11 @@ from typing import Any import httpx import structlog -from playwright.async_api import Page from sqlalchemy.exc import OperationalError from skyvern.config import settings from skyvern.exceptions import ( FailedToSendWebhook, - MissingBrowserState, TaskTerminationError, TaskV2NotFound, UrlGenerationFailure, @@ -188,40 +186,13 @@ async def initialize_task_v2( context.run_id = context.run_id or task_v2.observer_cruise_id context.max_screenshot_scrolls = max_screenshot_scrolling_times - thought = await app.DATABASE.create_thought( - task_v2_id=task_v2.observer_cruise_id, - organization_id=organization.organization_id, - thought_type=ThoughtType.metadata, - thought_scenario=ThoughtScenario.generate_metadata, - ) - - metadata_prompt = prompt_engine.load_prompt("task_v2_generate_metadata", user_goal=user_prompt, user_url=user_url) - metadata_response = await app.LLM_API_HANDLER( - prompt=metadata_prompt, - thought=thought, - prompt_name="task_v2_generate_metadata", - ) - # validate - LOG.info(f"Initialized task v2 initial response: {metadata_response}") - url: str = user_url or metadata_response.get("url", "") - if not url: - raise UrlGenerationFailure() - title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE) - metadata = TaskV2Metadata( - url=url, - workflow_title=title, - ) - url = metadata.url - if not url: - raise UrlGenerationFailure() - # create workflow and workflow run max_steps_override = 10 try: workflow_status = WorkflowStatus.published if publish_workflow else WorkflowStatus.auto_generated new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow( organization, - metadata.workflow_title, + title=DEFAULT_WORKFLOW_TITLE, # default title is updated as the first step of the task proxy_location=proxy_location, status=workflow_status, max_screenshot_scrolling_times=max_screenshot_scrolling_times, @@ -253,27 +224,13 @@ async def initialize_task_v2( ) raise - try: - await app.DATABASE.update_thought( - thought_id=thought.observer_thought_id, - organization_id=organization.organization_id, - workflow_run_id=workflow_run.workflow_run_id, - workflow_id=new_workflow.workflow_id, - workflow_permanent_id=new_workflow.workflow_permanent_id, - thought=metadata_response.get("thoughts", ""), - output=metadata.model_dump(), - ) - except Exception: - LOG.warning("Failed to update thought", exc_info=True) - - # update oserver cruise + # update observer cruise try: task_v2 = await app.DATABASE.update_task_v2( task_v2_id=task_v2.observer_cruise_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=new_workflow.workflow_id, workflow_permanent_id=new_workflow.workflow_permanent_id, - url=url, organization_id=organization.organization_id, ) if create_task_run: @@ -282,8 +239,120 @@ async def initialize_task_v2( organization_id=organization.organization_id, run_id=task_v2.observer_cruise_id, title=new_workflow.title, - url=url, - url_hash=generate_url_hash(url), + ) + except Exception: + LOG.warning("Failed to update task 2.0", exc_info=True) + # fail the workflow run + task_v2 = await mark_task_v2_as_failed( + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=workflow_run.workflow_run_id, + failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run", + organization_id=organization.organization_id, + ) + raise + + return task_v2 + + +async def initialize_task_v2_metadata( + organization: Organization, + task_v2: TaskV2, + workflow: Workflow, + workflow_run: WorkflowRun, + user_prompt: str | None, + current_browser_url: str | None, + user_url: str | None, + current_run_id: str, +) -> TaskV2: + thought = await app.DATABASE.create_thought( + task_v2_id=task_v2.observer_cruise_id, + organization_id=organization.organization_id, + thought_type=ThoughtType.metadata, + thought_scenario=ThoughtScenario.generate_metadata, + ) + + enable_current_url_validation = await app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached( + "ENABLE_TASKV2_METADATA_CURRENT_URL_VALIDATION", + current_run_id, + properties={"organization_id": task_v2.organization_id}, + ) + + if enable_current_url_validation: + metadata_prompt = prompt_engine.load_prompt( + "task_v2_generate_metadata_with_current_url", + user_goal=user_prompt, + current_browser_url=current_browser_url or "about:blank", + user_url=user_url, + ) + metadata_response = await app.LLM_API_HANDLER( + prompt=metadata_prompt, + thought=thought, + prompt_name="task_v2_generate_metadata_with_current_url", + ) + else: + metadata_prompt = prompt_engine.load_prompt( + "task_v2_generate_metadata", + user_goal=user_prompt, + user_url=user_url, + ) + metadata_response = await app.LLM_API_HANDLER( + prompt=metadata_prompt, + thought=thought, + prompt_name="task_v2_generate_metadata", + ) + + # validate + LOG.info(f"Initialized task v2 initial response: {metadata_response}") + url: str = user_url or metadata_response.get("url", "") + if not url: + raise UrlGenerationFailure() + title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE) + metadata = TaskV2Metadata( + url=url, + workflow_title=title, + ) + url = metadata.url + if not url: + raise UrlGenerationFailure() + + try: + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, + organization_id=organization.organization_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + thought=metadata_response.get("thoughts", ""), + output=metadata.model_dump(), + ) + except Exception: + LOG.warning("Failed to update thought", exc_info=True) + + # update workflow & tasks with the inferred title and url + try: + await app.DATABASE.update_workflow( + workflow_id=workflow.workflow_id, + organization_id=organization.organization_id, + title=metadata.workflow_title, + ) + task_v2 = await app.DATABASE.update_task_v2( + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + url=metadata.url, + organization_id=organization.organization_id, + ) + task_run = await app.DATABASE.get_run( + run_id=task_v2.observer_cruise_id, organization_id=organization.organization_id + ) + if task_run: + await app.DATABASE.update_task_run( + organization_id=organization.organization_id, + run_id=task_v2.observer_cruise_id, + title=metadata.workflow_title, + url=metadata.url, + url_hash=generate_url_hash(metadata.url), ) except Exception: LOG.warning("Failed to update task 2.0", exc_info=True) @@ -401,7 +470,7 @@ async def run_task_v2_helper( organization_id=organization_id, ) return None, None, task_v2 - if not task_v2.url or not task_v2.prompt: + if not task_v2.prompt: LOG.error( "Task v2 url or prompt not found", task_v2_id=task_v2_id, @@ -441,6 +510,8 @@ async def run_task_v2_helper( ) workflow_run_id = task_v2.workflow_run_id + if not workflow_run_id: + raise ValueError("workflow_run_id is missing") workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id) if not workflow_run: @@ -493,12 +564,31 @@ async def run_task_v2_helper( workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id) await _set_up_workflow_context(workflow, workflow_run_id, organization) - url = str(task_v2.url) user_prompt = task_v2.prompt task_history: list[dict] = [] yaml_blocks: list[BLOCK_YAML_TYPES] = [] yaml_parameters: list[PARAMETER_YAML_TYPES] = [] - browser_state: BrowserState | None = None + current_url: str | None = None + + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, browser_session_id=browser_session_id + ) + + page = await browser_state.get_working_page() + if page: + current_url = await SkyvernFrame.get_url(page) + + task_v2 = await initialize_task_v2_metadata( + task_v2=task_v2, + workflow=workflow, + workflow_run=workflow_run, + organization=organization, + user_prompt=task_v2.prompt, + current_browser_url=current_url, + user_url=task_v2.url, + current_run_id=current_run_id, + ) + url = str(task_v2.url) max_steps = int_max_steps_override or settings.MAX_STEPS_PER_TASK_V2 for i in range(DEFAULT_MAX_ITERATIONS): @@ -536,36 +626,25 @@ async def run_task_v2_helper( context = skyvern_context.ensure_context() # Always ensure browser_state is available at the start of the loop - current_url: str | None = None - page: Page | None = None fallback_url = settings.TASK_BLOCKED_SITE_FALLBACK_URL - browser_state = app.BROWSER_MANAGER.get_for_workflow_run( - workflow_run_id=workflow_run_id, parent_workflow_run_id=workflow_run.parent_workflow_run_id + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, browser_session_id=browser_session_id ) + fallback_occurred = False - if browser_state is None: - try: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, + if url != current_url: + if page is None: + page = await browser_state.get_or_create_page( url=url, - browser_session_id=browser_session_id, + proxy_location=workflow_run.proxy_location, + task_id=task_v2.task_id, + workflow_run_id=workflow_run_id, + script_id=task_v2.script_id, + organization_id=organization_id, + extra_http_headers=task_v2.extra_http_headers, ) - except Exception: - LOG.warning("Failed to get or create browser state, fallback to Google", exc_info=True, url=url) - - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, - url=fallback_url, - browser_session_id=browser_session_id, - ) - - fallback_occurred = True - - if browser_state is None: - LOG.error("Failed to create browser state even after fallback", workflow_run_id=workflow_run_id) - raise MissingBrowserState(workflow_run_id=workflow_run_id) - - page = await browser_state.get_working_page() + else: + await browser_state.navigate_to_url(page, url) page_loaded = False if page: @@ -590,10 +669,6 @@ async def run_task_v2_helper( fallback_occurred = True except Exception: LOG.exception("Failed to load Google fallback", exc_info=True, url=url, current_url=current_url) - else: - page = await browser_state.get_working_page() - if page: - current_url = await SkyvernFrame.get_url(page) if i == 0 and current_url != url: if fallback_occurred: