92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
from typing import Dict, List
|
|
|
|
from playwright.async_api import Page
|
|
|
|
from skyvern.exceptions import StepUnableToExecuteError
|
|
from skyvern.forge import app
|
|
from skyvern.forge.async_operations import AsyncOperation
|
|
from skyvern.forge.sdk.models import Organization, Step, StepStatus
|
|
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
|
|
from skyvern.webeye.browser_factory import BrowserState
|
|
|
|
|
|
def _remove_rect(element: dict) -> None:
|
|
if "rect" in element:
|
|
del element["rect"]
|
|
|
|
|
|
class AgentFunction:
|
|
async def validate_step_execution(
|
|
self,
|
|
task: Task,
|
|
step: Step,
|
|
) -> None:
|
|
"""
|
|
Checks if the step can be executed. It is called before the step is executed.
|
|
:return: A tuple of whether the step can be executed and a list of reasons why it can't be executed.
|
|
"""
|
|
reasons = []
|
|
# can't execute if task status is not running
|
|
has_valid_task_status = task.status == TaskStatus.running
|
|
if not has_valid_task_status:
|
|
reasons.append(f"invalid_task_status:{task.status}")
|
|
# can't execute if the step is already running or completed
|
|
has_valid_step_status = step.status in [StepStatus.created, StepStatus.failed]
|
|
if not has_valid_step_status:
|
|
reasons.append(f"invalid_step_status:{step.status}")
|
|
# can't execute if the task has another step that is running
|
|
steps = await app.DATABASE.get_task_steps(task_id=task.task_id, organization_id=task.organization_id)
|
|
has_no_running_steps = not any(step.status == StepStatus.running for step in steps)
|
|
if not has_no_running_steps:
|
|
reasons.append(f"another_step_is_running_for_task:{task.task_id}")
|
|
|
|
can_execute = has_valid_task_status and has_valid_step_status and has_no_running_steps
|
|
if not can_execute:
|
|
raise StepUnableToExecuteError(step_id=step.step_id, reason=f"Cannot execute step. Reasons: {reasons}")
|
|
|
|
async def prepare_step_execution(
|
|
self,
|
|
organization: Organization | None,
|
|
task: Task,
|
|
step: Step,
|
|
browser_state: BrowserState,
|
|
) -> None:
|
|
"""
|
|
Get prepared for the step execution. It's called at the first beginning when step running.
|
|
"""
|
|
return
|
|
|
|
def generate_async_operations(
|
|
self,
|
|
organization: Organization,
|
|
task: Task,
|
|
page: Page,
|
|
) -> list[AsyncOperation]:
|
|
return []
|
|
|
|
async def cleanup_element_tree(
|
|
self,
|
|
url: str,
|
|
element_tree: List[Dict],
|
|
) -> List[Dict]:
|
|
"""
|
|
Remove rect and attribute.unique_id from the elements.
|
|
The reason we're doing it is to
|
|
1. reduce unnecessary data so that llm get less distrction
|
|
TODO later: 2. reduce tokens sent to llm to save money
|
|
:param elements: List of elements to remove xpaths from.
|
|
:return: List of elements without xpaths.
|
|
"""
|
|
queue = []
|
|
for element in element_tree:
|
|
queue.append(element)
|
|
while queue:
|
|
queue_ele = queue.pop(0)
|
|
_remove_rect(queue_ele)
|
|
# TODO: we can come back to test removing the unique_id
|
|
# from element attributes to make sure this won't increase hallucination
|
|
# _remove_unique_id(queue_ele)
|
|
if "children" in queue_ele:
|
|
queue.extend(queue_ele["children"])
|
|
return element_tree
|