Extract SkyvernPageAi from SkyvernPage (#3825)

This commit is contained in:
Stanislav Novosad
2025-10-27 13:01:22 -06:00
committed by GitHub
parent 353358ee17
commit a889a238d8
2 changed files with 462 additions and 398 deletions

View File

@@ -2,27 +2,21 @@ from __future__ import annotations
import asyncio
import copy
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any, Callable, Literal
import structlog
from jinja2.sandbox import SandboxedEnvironment
from playwright.async_api import Page
from skyvern.config import settings
from skyvern.constants import SPECIAL_FIELD_VERIFICATION_CODE
from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi, render_template
from skyvern.exceptions import ScriptTerminationException, WorkflowRunNotFound
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import download_file
from skyvern.forge.sdk.artifact.models import ArtifactType
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.schemas.totp_codes import OTPType
from skyvern.services.otp_service import poll_otp_value
from skyvern.utils.prompt_engine import load_prompt_with_elements
from skyvern.utils.url_validators import prepend_scheme_and_validate_url
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.actions.action_types import ActionType
@@ -31,25 +25,17 @@ from skyvern.webeye.actions.actions import (
ActionStatus,
CompleteAction,
ExtractAction,
InputTextAction,
SelectOption,
SolveCaptchaAction,
)
from skyvern.webeye.actions.handler import (
ActionHandler,
handle_click_action,
handle_complete_action,
handle_input_text_action,
handle_select_option_action,
)
from skyvern.webeye.actions.parse_actions import parse_actions
from skyvern.webeye.browser_factory import BrowserState
from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website
jinja_sandbox_env = SandboxedEnvironment()
LOG = structlog.get_logger()
SELECT_OPTION_GOAL = """- The intention to select an option: {intention}.
- The overall goal that the user wants to achieve: {prompt}."""
class Driver(StrEnum):
@@ -74,80 +60,6 @@ class ActionCall:
error: Exception | None = None # populated if failed
async def _get_element_id_by_selector(selector: str, page: Page) -> str | None:
locator = page.locator(selector)
element_id = await locator.get_attribute("unique_id")
return element_id
def _get_context_data(data: str | dict[str, Any] | None = None) -> dict[str, Any] | str | None:
context = skyvern_context.current()
global_context_data = context.script_run_parameters if context else None
if not data:
return global_context_data
result: dict[str, Any] | str | None
if isinstance(data, dict):
result = {k: v for k, v in data.items() if v}
if global_context_data:
result.update(global_context_data)
else:
global_context_data_str = json.dumps(global_context_data) if global_context_data else ""
result = f"{data}\n{global_context_data_str}"
return result
def _render_template_with_label(template: str, label: str | None = None) -> str:
template_data = {}
context = skyvern_context.current()
if context and context.workflow_run_id:
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(context.workflow_run_id)
block_reference_data: dict[str, Any] = workflow_run_context.get_block_metadata(label)
template_data = workflow_run_context.values.copy()
if label in template_data:
current_value = template_data[label]
if isinstance(current_value, dict):
block_reference_data.update(current_value)
else:
LOG.warning(
f"Script service: Parameter {label} has a registered reference value, going to overwrite it by block metadata"
)
if label:
template_data[label] = block_reference_data
# inject the forloop metadata as global variables
if "current_index" in block_reference_data:
template_data["current_index"] = block_reference_data["current_index"]
if "current_item" in block_reference_data:
template_data["current_item"] = block_reference_data["current_item"]
if "current_value" in block_reference_data:
template_data["current_value"] = block_reference_data["current_value"]
try:
return render_template(template, data=template_data)
except Exception:
LOG.exception("Failed to render template", template=template, data=template_data)
return template
def render_template(template: str, data: dict[str, Any] | None = None) -> str:
"""
Refer to Block.format_block_parameter_template_from_workflow_run_context
TODO: complete this function so that block code shares the same template rendering logic
"""
template_data = data.copy() if data else {}
jinja_template = jinja_sandbox_env.from_string(template)
context = skyvern_context.current()
if context and context.workflow_run_id:
workflow_run_id = context.workflow_run_id
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
template_data.update(workflow_run_context.values)
if template in template_data:
return template_data[template]
return jinja_template.render(template_data)
class SkyvernPage:
"""
A minimal adapter around the chosen driver that:
@@ -160,6 +72,7 @@ class SkyvernPage:
self,
scraped_page: ScrapedPage,
page: Page,
ai: SkyvernPageAi,
*,
recorder: Callable[[ActionCall], None] | None = None,
# generate_response: bool = False,
@@ -168,6 +81,7 @@ class SkyvernPage:
self.page = page
self._record = recorder or (lambda ac: None)
self.current_label: str | None = None
self._ai = ai
@classmethod
async def _get_or_create_browser_state(cls, browser_session_id: str | None = None) -> BrowserState:
@@ -207,10 +121,20 @@ class SkyvernPage:
cls,
browser_session_id: str | None = None,
) -> SkyvernPage:
scraped_page = await cls.create_scraped_page(browser_session_id=browser_session_id)
page = await scraped_page._browser_state.must_get_working_page()
ai = SkyvernPageAi(scraped_page, page)
return cls(scraped_page=scraped_page, page=page, ai=ai)
@classmethod
async def create_scraped_page(
cls,
browser_session_id: str | None = None,
) -> ScrapedPage:
# initialize browser state
# TODO: add workflow_run_id or eventually script_id/script_run_id
browser_state = await cls._get_or_create_browser_state(browser_session_id=browser_session_id)
scraped_page = await scrape_website(
return await scrape_website(
browser_state=browser_state,
url="",
cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(),
@@ -220,8 +144,6 @@ class SkyvernPage:
scroll=True,
support_empty_page=True,
)
page = await scraped_page._browser_state.must_get_working_page()
return cls(scraped_page=scraped_page, page=page)
@staticmethod
def action_wrap(
@@ -519,60 +441,6 @@ class SkyvernPage:
# If screenshot creation fails, don't block execution
pass
async def _ai_click(
self,
selector: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
try:
# Build the element tree of the current page for the prompt
context = skyvern_context.ensure_context()
payload_str = _get_context_data(data)
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
element_tree = refreshed_page.build_element_tree()
single_click_prompt = prompt_engine.load_prompt(
template="single-click-action",
navigation_goal=intention,
navigation_payload_str=payload_str,
current_url=self.page.url,
elements=element_tree,
local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(),
# user_context=getattr(context, "prompt", None),
)
json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER(
prompt=single_click_prompt,
prompt_name="single-click-action",
organization_id=context.organization_id,
)
actions_json = json_response.get("actions", [])
if actions_json:
organization_id = context.organization_id if context else None
task_id = context.task_id if context else None
step_id = context.step_id if context else None
task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None
step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None
if organization_id and task and step:
actions = parse_actions(
task, step.step_id, step.order, self.scraped_page, json_response.get("actions", [])
)
action = actions[0]
result = await handle_click_action(action, self.page, self.scraped_page, task, step)
if result and result[-1].success is False:
raise Exception(result[-1].exception_message)
xpath = action.get_xpath()
selector = f"xpath={xpath}" if xpath else selector
return selector
except Exception:
LOG.exception(
f"Failed to do ai click. Falling back to original selector={selector}, intention={intention}, data={data}"
)
locator = self.page.locator(selector)
await locator.click(timeout=timeout)
return selector
######### Public Interfaces #########
@action_wrap(ActionType.CLICK)
async def click(
@@ -608,7 +476,7 @@ class SkyvernPage:
# if the original selector doesn't work, try to click the element with the ai generated selector
if intention:
return await self._ai_click(
return await self._ai.ai_click(
selector=selector,
intention=intention,
data=data,
@@ -620,7 +488,7 @@ class SkyvernPage:
return selector
elif ai == "proactive":
if intention:
return await self._ai_click(
return await self._ai.ai_click(
selector=selector,
intention=intention,
data=data,
@@ -676,96 +544,6 @@ class SkyvernPage:
totp_url=totp_url,
)
async def _ai_input_text(
self,
selector: str,
value: str,
intention: str,
data: str | dict[str, Any] | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
context = skyvern_context.current()
value = value or ""
transformed_value = value
element_id: str | None = None
organization_id = context.organization_id if context else None
task_id = context.task_id if context else None
step_id = context.step_id if context else None
workflow_run_id = context.workflow_run_id if context else None
task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None
step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None
if intention:
try:
prompt = context.prompt if context else None
data = data or {}
if (totp_identifier or totp_url) and context and organization_id and task_id:
if totp_identifier:
totp_identifier = _render_template_with_label(totp_identifier, label=self.current_label)
if totp_url:
totp_url = _render_template_with_label(totp_url, label=self.current_label)
otp_value = await poll_otp_value(
organization_id=organization_id,
task_id=task_id,
workflow_run_id=workflow_run_id,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
)
if otp_value and otp_value.get_otp_type() == OTPType.TOTP:
verification_code = otp_value.value
if isinstance(data, dict) and SPECIAL_FIELD_VERIFICATION_CODE not in data:
data[SPECIAL_FIELD_VERIFICATION_CODE] = verification_code
elif isinstance(data, str) and SPECIAL_FIELD_VERIFICATION_CODE not in data:
data = f"{data}\n" + str({SPECIAL_FIELD_VERIFICATION_CODE: verification_code})
elif isinstance(data, list):
data.append({SPECIAL_FIELD_VERIFICATION_CODE: verification_code})
else:
data = {SPECIAL_FIELD_VERIFICATION_CODE: verification_code}
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
self.scraped_page = refreshed_page
# get the element_id by the selector
element_id = await _get_element_id_by_selector(selector, self.page)
script_generation_input_text_prompt = prompt_engine.load_prompt(
template="script-generation-input-text-generatiion",
intention=intention,
goal=prompt,
data=data,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
prompt=script_generation_input_text_prompt,
prompt_name="script-generation-input-text-generatiion",
organization_id=organization_id,
)
value = json_response.get("answer", value)
except Exception:
LOG.exception(f"Failed to adapt value for input text action on selector={selector}, value={value}")
if context and context.workflow_run_id:
transformed_value = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, str(value))
if element_id and organization_id and task and step:
action = InputTextAction(
element_id=element_id,
text=value,
status=ActionStatus.pending,
organization_id=organization_id,
workflow_run_id=workflow_run_id,
task_id=task_id,
step_id=context.step_id if context else None,
reasoning=intention,
intention=intention,
response=value,
)
result = await handle_input_text_action(action, self.page, self.scraped_page, task, step)
if result and result[-1].success is False:
raise Exception(result[-1].exception_message)
else:
locator = self.page.locator(selector)
await handler_utils.input_sequentially(locator, transformed_value, timeout=timeout)
return value
async def _input_text(
self,
selector: str,
@@ -801,7 +579,7 @@ class SkyvernPage:
error_to_raise = e
if intention:
return await self._ai_input_text(
return await self._ai.ai_input_text(
selector=selector,
value=value,
intention=intention,
@@ -815,7 +593,7 @@ class SkyvernPage:
else:
return value
elif ai == "proactive" and intention:
return await self._ai_input_text(
return await self._ai.ai_input_text(
selector=selector,
value=value,
intention=intention,
@@ -828,40 +606,6 @@ class SkyvernPage:
await handler_utils.input_sequentially(locator, value, timeout=timeout)
return value
async def _ai_upload_file(
self,
selector: str,
files: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
if intention:
try:
context = skyvern_context.current()
prompt = context.prompt if context else None
data = _get_context_data(data)
script_generation_file_url_prompt = prompt_engine.load_prompt(
template="script-generation-file-url-generation",
intention=intention,
data=data,
goal=prompt,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
prompt=script_generation_file_url_prompt,
prompt_name="script-generation-file-url-generation",
organization_id=context.organization_id if context else None,
)
files = json_response.get("answer", files)
except Exception:
LOG.exception(f"Failed to adapt value for input text action on selector={selector}, file={files}")
if not files:
raise ValueError("file url must be provided")
file_path = await download_file(files)
locator = self.page.locator(selector)
await locator.set_input_files(file_path, timeout=timeout)
return files
@action_wrap(ActionType.UPLOAD_FILE)
async def upload_file(
self,
@@ -884,7 +628,7 @@ class SkyvernPage:
except Exception as e:
error_to_raise = e
if intention:
return await self._ai_upload_file(
return await self._ai.ai_upload_file(
selector=selector,
files=files,
intention=intention,
@@ -896,7 +640,7 @@ class SkyvernPage:
else:
return files
elif ai == "proactive" and intention:
return await self._ai_upload_file(
return await self._ai.ai_upload_file(
selector=selector,
files=files,
intention=intention,
@@ -908,69 +652,6 @@ class SkyvernPage:
await locator.set_input_files(file_path, timeout=timeout)
return files
async def _ai_select_option(
self,
selector: str,
value: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
option_value = value or ""
context = skyvern_context.current()
if context and context.task_id and context.step_id and context.organization_id:
task = await app.DATABASE.get_task(context.task_id, organization_id=context.organization_id)
step = await app.DATABASE.get_step(context.step_id, organization_id=context.organization_id)
if intention and task and step:
try:
prompt = context.prompt if context else None
# data = _get_context_data(data)
data = data or {}
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
self.scraped_page = refreshed_page
element_tree = refreshed_page.build_element_tree()
merged_goal = SELECT_OPTION_GOAL.format(intention=intention, prompt=prompt)
single_select_prompt = prompt_engine.load_prompt(
template="single-select-action",
navigation_payload_str=data,
navigation_goal=merged_goal,
current_url=self.page.url,
elements=element_tree,
local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(),
)
json_response = await app.SELECT_AGENT_LLM_API_HANDLER(
prompt=single_select_prompt,
prompt_name="single-select-action",
organization_id=context.organization_id if context else None,
)
actions = parse_actions(
task, step.step_id, step.order, self.scraped_page, json_response.get("actions", [])
)
if actions:
action = actions[0]
if not action.option:
raise ValueError("SelectOptionAction requires an 'option' field")
option_value = action.option.value or action.option.label or ""
await handle_select_option_action(
action=action,
page=self.page,
scraped_page=self.scraped_page,
task=task,
step=step,
)
else:
LOG.exception(
f"Failed to parse actions for select option action on selector={selector}, value={value}"
)
except Exception:
LOG.exception(
f"Failed to adapt value for select option action on selector={selector}, value={value}"
)
else:
locator = self.page.locator(selector)
await locator.select_option(option_value, timeout=timeout)
return option_value
@action_wrap(ActionType.SELECT_OPTION)
async def select_option(
self,
@@ -995,7 +676,7 @@ class SkyvernPage:
except Exception as e:
error_to_raise = e
if intention:
return await self._ai_select_option(
return await self._ai.ai_select_option(
selector=selector,
value=value,
intention=intention,
@@ -1007,7 +688,7 @@ class SkyvernPage:
else:
return value
elif ai == "proactive" and intention:
return await self._ai_select_option(
return await self._ai.ai_select_option(
selector=selector,
value=value,
intention=intention,
@@ -1098,50 +779,7 @@ class SkyvernPage:
intention: str | None = None,
data: str | dict[str, Any] | None = None,
) -> dict[str, Any] | list | str | None:
scraped_page_refreshed = await self.scraped_page.refresh()
context = skyvern_context.current()
tz_info = datetime.now(tz=timezone.utc).tzinfo
if context and context.tz_info:
tz_info = context.tz_info
prompt = _render_template_with_label(prompt, label=self.current_label)
extract_information_prompt = load_prompt_with_elements(
element_tree_builder=scraped_page_refreshed,
prompt_engine=prompt_engine,
template_name="extract-information",
html_need_skyvern_attrs=False,
data_extraction_goal=prompt,
extracted_information_schema=schema,
current_url=scraped_page_refreshed.url,
extracted_text=scraped_page_refreshed.extracted_text,
error_code_mapping_str=(json.dumps(error_code_mapping) if error_code_mapping else None),
local_datetime=datetime.now(tz_info).isoformat(),
)
step = None
if context and context.organization_id and context.task_id and context.step_id:
step = await app.DATABASE.get_step(
step_id=context.step_id,
organization_id=context.organization_id,
)
result = await app.EXTRACTION_LLM_API_HANDLER(
prompt=extract_information_prompt,
step=step,
screenshots=scraped_page_refreshed.screenshots,
prompt_name="extract-information",
)
if context and context.script_mode:
print(f"\n✨ 📊 Extracted Information:\n{'-' * 50}")
try:
# Pretty print JSON if result is a dict/list
if isinstance(result, (dict, list)):
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(result)
except Exception:
print(result)
print(f"{'-' * 50}\n")
return result
return await self._ai.ai_extract(prompt, schema, error_code_mapping, intention, data)
@action_wrap(ActionType.VERIFICATION_CODE)
async def verification_code(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None:
@@ -1209,19 +847,6 @@ class RunContext:
self.trace: list[ActionCall] = []
async def _get_actual_value_of_parameter_if_secret(workflow_run_id: str, parameter: str) -> Any:
"""
Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is.
Just return the parameter value if the task isn't a workflow's task.
This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url).
"""
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)
return secret_value if secret_value is not None else parameter
class ScriptRunContextManager:
"""
Manages the run context for code runs.

View File

@@ -0,0 +1,439 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
import structlog
from jinja2.sandbox import SandboxedEnvironment
from playwright.async_api import Page
from skyvern.config import settings
from skyvern.constants import SPECIAL_FIELD_VERIFICATION_CODE
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import download_file
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.schemas.totp_codes import OTPType
from skyvern.services.otp_service import poll_otp_value
from skyvern.utils.prompt_engine import load_prompt_with_elements
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.actions.actions import (
ActionStatus,
InputTextAction,
)
from skyvern.webeye.actions.handler import (
handle_click_action,
handle_input_text_action,
handle_select_option_action,
)
from skyvern.webeye.actions.parse_actions import parse_actions
from skyvern.webeye.scraper.scraper import ScrapedPage
jinja_sandbox_env = SandboxedEnvironment()
LOG = structlog.get_logger()
SELECT_OPTION_GOAL = """- The intention to select an option: {intention}.
- The overall goal that the user wants to achieve: {prompt}."""
async def _get_element_id_by_selector(selector: str, page: Page) -> str | None:
locator = page.locator(selector)
element_id = await locator.get_attribute("unique_id")
return element_id
def _get_context_data(data: str | dict[str, Any] | None = None) -> dict[str, Any] | str | None:
context = skyvern_context.current()
global_context_data = context.script_run_parameters if context else None
if not data:
return global_context_data
result: dict[str, Any] | str | None
if isinstance(data, dict):
result = {k: v for k, v in data.items() if v}
if global_context_data:
result.update(global_context_data)
else:
global_context_data_str = json.dumps(global_context_data) if global_context_data else ""
result = f"{data}\n{global_context_data_str}"
return result
def _render_template_with_label(template: str, label: str | None = None) -> str:
template_data = {}
context = skyvern_context.current()
if context and context.workflow_run_id:
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(context.workflow_run_id)
block_reference_data: dict[str, Any] = workflow_run_context.get_block_metadata(label)
template_data = workflow_run_context.values.copy()
if label in template_data:
current_value = template_data[label]
if isinstance(current_value, dict):
block_reference_data.update(current_value)
else:
LOG.warning(
f"Script service: Parameter {label} has a registered reference value, going to overwrite it by block metadata"
)
if label:
template_data[label] = block_reference_data
# inject the forloop metadata as global variables
if "current_index" in block_reference_data:
template_data["current_index"] = block_reference_data["current_index"]
if "current_item" in block_reference_data:
template_data["current_item"] = block_reference_data["current_item"]
if "current_value" in block_reference_data:
template_data["current_value"] = block_reference_data["current_value"]
try:
return render_template(template, data=template_data)
except Exception:
LOG.exception("Failed to render template", template=template, data=template_data)
return template
def render_template(template: str, data: dict[str, Any] | None = None) -> str:
"""
Refer to Block.format_block_parameter_template_from_workflow_run_context
TODO: complete this function so that block code shares the same template rendering logic
"""
template_data = data.copy() if data else {}
jinja_template = jinja_sandbox_env.from_string(template)
context = skyvern_context.current()
if context and context.workflow_run_id:
workflow_run_id = context.workflow_run_id
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
template_data.update(workflow_run_context.values)
if template in template_data:
return template_data[template]
return jinja_template.render(template_data)
class SkyvernPageAi:
def __init__(
self,
scraped_page: ScrapedPage,
page: Page,
):
self.scraped_page = scraped_page
self.page = page
self.current_label: str | None = None
async def ai_click(
self,
selector: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
"""Click an element using AI to locate it based on intention."""
try:
# Build the element tree of the current page for the prompt
context = skyvern_context.ensure_context()
payload_str = _get_context_data(data)
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
element_tree = refreshed_page.build_element_tree()
single_click_prompt = prompt_engine.load_prompt(
template="single-click-action",
navigation_goal=intention,
navigation_payload_str=payload_str,
current_url=self.page.url,
elements=element_tree,
local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(),
# user_context=getattr(context, "prompt", None),
)
json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER(
prompt=single_click_prompt,
prompt_name="single-click-action",
organization_id=context.organization_id,
)
actions_json = json_response.get("actions", [])
if actions_json:
organization_id = context.organization_id if context else None
task_id = context.task_id if context else None
step_id = context.step_id if context else None
task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None
step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None
if organization_id and task and step:
actions = parse_actions(
task, step.step_id, step.order, self.scraped_page, json_response.get("actions", [])
)
action = actions[0]
result = await handle_click_action(action, self.page, self.scraped_page, task, step)
if result and result[-1].success is False:
raise Exception(result[-1].exception_message)
xpath = action.get_xpath()
selector = f"xpath={xpath}" if xpath else selector
return selector
except Exception:
LOG.exception(
f"Failed to do ai click. Falling back to original selector={selector}, intention={intention}, data={data}"
)
locator = self.page.locator(selector)
await locator.click(timeout=timeout)
return selector
async def ai_input_text(
self,
selector: str,
value: str,
intention: str,
data: str | dict[str, Any] | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
"""Input text into an element using AI to determine the value."""
context = skyvern_context.current()
value = value or ""
transformed_value = value
element_id: str | None = None
organization_id = context.organization_id if context else None
task_id = context.task_id if context else None
step_id = context.step_id if context else None
workflow_run_id = context.workflow_run_id if context else None
task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None
step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None
if intention:
try:
prompt = context.prompt if context else None
data = data or {}
if (totp_identifier or totp_url) and context and organization_id and task_id:
if totp_identifier:
totp_identifier = _render_template_with_label(totp_identifier, label=self.current_label)
if totp_url:
totp_url = _render_template_with_label(totp_url, label=self.current_label)
otp_value = await poll_otp_value(
organization_id=organization_id,
task_id=task_id,
workflow_run_id=workflow_run_id,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
)
if otp_value and otp_value.get_otp_type() == OTPType.TOTP:
verification_code = otp_value.value
if isinstance(data, dict) and SPECIAL_FIELD_VERIFICATION_CODE not in data:
data[SPECIAL_FIELD_VERIFICATION_CODE] = verification_code
elif isinstance(data, str) and SPECIAL_FIELD_VERIFICATION_CODE not in data:
data = f"{data}\n" + str({SPECIAL_FIELD_VERIFICATION_CODE: verification_code})
elif isinstance(data, list):
data.append({SPECIAL_FIELD_VERIFICATION_CODE: verification_code})
else:
data = {SPECIAL_FIELD_VERIFICATION_CODE: verification_code}
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
self.scraped_page = refreshed_page
# get the element_id by the selector
element_id = await _get_element_id_by_selector(selector, self.page)
script_generation_input_text_prompt = prompt_engine.load_prompt(
template="script-generation-input-text-generatiion",
intention=intention,
goal=prompt,
data=data,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
prompt=script_generation_input_text_prompt,
prompt_name="script-generation-input-text-generatiion",
organization_id=organization_id,
)
value = json_response.get("answer", value)
except Exception:
LOG.exception(f"Failed to adapt value for input text action on selector={selector}, value={value}")
if context and context.workflow_run_id:
transformed_value = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, str(value))
if element_id and organization_id and task and step:
action = InputTextAction(
element_id=element_id,
text=value,
status=ActionStatus.pending,
organization_id=organization_id,
workflow_run_id=workflow_run_id,
task_id=task_id,
step_id=context.step_id if context else None,
reasoning=intention,
intention=intention,
response=value,
)
result = await handle_input_text_action(action, self.page, self.scraped_page, task, step)
if result and result[-1].success is False:
raise Exception(result[-1].exception_message)
else:
locator = self.page.locator(selector)
await handler_utils.input_sequentially(locator, transformed_value, timeout=timeout)
return value
async def ai_upload_file(
self,
selector: str,
files: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
"""Upload a file using AI to process the file URL."""
if intention:
try:
context = skyvern_context.current()
prompt = context.prompt if context else None
data = _get_context_data(data)
script_generation_file_url_prompt = prompt_engine.load_prompt(
template="script-generation-file-url-generation",
intention=intention,
data=data,
goal=prompt,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
prompt=script_generation_file_url_prompt,
prompt_name="script-generation-file-url-generation",
organization_id=context.organization_id if context else None,
)
files = json_response.get("answer", files)
except Exception:
LOG.exception(f"Failed to adapt value for input text action on selector={selector}, file={files}")
if not files:
raise ValueError("file url must be provided")
file_path = await download_file(files)
locator = self.page.locator(selector)
await locator.set_input_files(file_path, timeout=timeout)
return files
async def ai_select_option(
self,
selector: str,
value: str,
intention: str,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> str:
"""Select an option from a dropdown using AI."""
option_value = value or ""
context = skyvern_context.current()
if context and context.task_id and context.step_id and context.organization_id:
task = await app.DATABASE.get_task(context.task_id, organization_id=context.organization_id)
step = await app.DATABASE.get_step(context.step_id, organization_id=context.organization_id)
if intention and task and step:
try:
prompt = context.prompt if context else None
# data = _get_context_data(data)
data = data or {}
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
self.scraped_page = refreshed_page
element_tree = refreshed_page.build_element_tree()
merged_goal = SELECT_OPTION_GOAL.format(intention=intention, prompt=prompt)
single_select_prompt = prompt_engine.load_prompt(
template="single-select-action",
navigation_payload_str=data,
navigation_goal=merged_goal,
current_url=self.page.url,
elements=element_tree,
local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(),
)
json_response = await app.SELECT_AGENT_LLM_API_HANDLER(
prompt=single_select_prompt,
prompt_name="single-select-action",
organization_id=context.organization_id if context else None,
)
actions = parse_actions(
task, step.step_id, step.order, self.scraped_page, json_response.get("actions", [])
)
if actions:
action = actions[0]
if not action.option:
raise ValueError("SelectOptionAction requires an 'option' field")
option_value = action.option.value or action.option.label or ""
await handle_select_option_action(
action=action,
page=self.page,
scraped_page=self.scraped_page,
task=task,
step=step,
)
else:
LOG.exception(
f"Failed to parse actions for select option action on selector={selector}, value={value}"
)
except Exception:
LOG.exception(
f"Failed to adapt value for select option action on selector={selector}, value={value}"
)
else:
locator = self.page.locator(selector)
await locator.select_option(option_value, timeout=timeout)
return option_value
async def ai_extract(
self,
prompt: str,
schema: dict[str, Any] | list | str | None = None,
error_code_mapping: dict[str, str] | None = None,
intention: str | None = None,
data: str | dict[str, Any] | None = None,
) -> dict[str, Any] | list | str | None:
"""Extract information from the page using AI."""
scraped_page_refreshed = await self.scraped_page.refresh()
context = skyvern_context.current()
tz_info = datetime.now(tz=timezone.utc).tzinfo
if context and context.tz_info:
tz_info = context.tz_info
prompt = _render_template_with_label(prompt, label=self.current_label)
extract_information_prompt = load_prompt_with_elements(
element_tree_builder=scraped_page_refreshed,
prompt_engine=prompt_engine,
template_name="extract-information",
html_need_skyvern_attrs=False,
data_extraction_goal=prompt,
extracted_information_schema=schema,
current_url=scraped_page_refreshed.url,
extracted_text=scraped_page_refreshed.extracted_text,
error_code_mapping_str=(json.dumps(error_code_mapping) if error_code_mapping else None),
local_datetime=datetime.now(tz_info).isoformat(),
)
step = None
if context and context.organization_id and context.task_id and context.step_id:
step = await app.DATABASE.get_step(
step_id=context.step_id,
organization_id=context.organization_id,
)
result = await app.EXTRACTION_LLM_API_HANDLER(
prompt=extract_information_prompt,
step=step,
screenshots=scraped_page_refreshed.screenshots,
prompt_name="extract-information",
)
if context and context.script_mode:
print(f"\n✨ 📊 Extracted Information:\n{'-' * 50}")
try:
# Pretty print JSON if result is a dict/list
if isinstance(result, (dict, list)):
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(result)
except Exception:
print(result)
print(f"{'-' * 50}\n")
return result
async def _get_actual_value_of_parameter_if_secret(workflow_run_id: str, parameter: str) -> Any:
"""
Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is.
Just return the parameter value if the task isn't a workflow's task.
This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url).
"""
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)
return secret_value if secret_value is not None else parameter