integration with CUA (#2126)

This commit is contained in:
Shuchang Zheng
2025-04-11 11:18:53 -07:00
committed by GitHub
parent 2ac65c4a9b
commit f883b91180
13 changed files with 420 additions and 53 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import base64
import json
import os
import random
@@ -10,6 +11,7 @@ from typing import Any, Tuple
import httpx
import structlog
from openai.types.responses.response import Response as OpenAIResponse
from playwright._impl._errors import TargetClosedError
from playwright.async_api import Page
@@ -68,6 +70,7 @@ from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, Tas
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
from skyvern.schemas.runs import RunEngine, RunType
from skyvern.utils.prompt_engine import load_prompt_with_elements
from skyvern.webeye.actions.actions import (
Action,
@@ -85,7 +88,7 @@ from skyvern.webeye.actions.actions import (
from skyvern.webeye.actions.caching import retrieve_action_plan
from skyvern.webeye.actions.handler import ActionHandler, poll_verification_code
from skyvern.webeye.actions.models import AgentStepOutput, DetailedAgentStepOutput
from skyvern.webeye.actions.parse_actions import parse_actions
from skyvern.webeye.actions.parse_actions import parse_actions, parse_cua_actions
from skyvern.webeye.actions.responses import ActionResult, ActionSuccess
from skyvern.webeye.browser_factory import BrowserState
from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website
@@ -248,6 +251,8 @@ class ForgeAgent:
task_block: BaseTaskBlock | None = None,
browser_session_id: str | None = None,
complete_verification: bool = True,
engine: RunEngine = RunEngine.skyvern_v1,
cua_response: OpenAIResponse | None = None,
) -> Tuple[Step, DetailedAgentStepOutput | None, Step | None]:
workflow_run: WorkflowRun | None = None
if task.workflow_run_id:
@@ -380,6 +385,8 @@ class ForgeAgent:
organization=organization,
task_block=task_block,
complete_verification=complete_verification,
engine=engine,
cua_response=cua_response,
)
await app.AGENT_FUNCTION.post_step_execution(task, step)
task = await self.update_task_errors_from_detailed_output(task, detailed_output)
@@ -506,6 +513,10 @@ class ForgeAgent:
step_status=step.status,
)
cua_response_param = detailed_output.cua_response if detailed_output else None
if not cua_response_param and cua_response:
cua_response_param = cua_response
if retry and next_step:
return await self.execute_step(
organization,
@@ -516,6 +527,8 @@ class ForgeAgent:
browser_session_id=browser_session_id,
task_block=task_block,
complete_verification=complete_verification,
engine=engine,
cua_response=cua_response_param,
)
elif settings.execute_all_steps() and next_step:
return await self.execute_step(
@@ -527,6 +540,8 @@ class ForgeAgent:
browser_session_id=browser_session_id,
task_block=task_block,
complete_verification=complete_verification,
engine=engine,
cua_response=cua_response_param,
)
else:
LOG.info(
@@ -757,9 +772,11 @@ class ForgeAgent:
task: Task,
step: Step,
browser_state: BrowserState,
engine: RunEngine = RunEngine.skyvern_v1,
organization: Organization | None = None,
task_block: BaseTaskBlock | None = None,
complete_verification: bool = True,
cua_response: OpenAIResponse | None = None,
) -> tuple[Step, DetailedAgentStepOutput]:
detailed_agent_step_output = DetailedAgentStepOutput(
scraped_page=None,
@@ -768,6 +785,7 @@ class ForgeAgent:
actions=None,
action_results=None,
actions_and_results=None,
cua_response=None,
)
try:
LOG.info(
@@ -789,52 +807,62 @@ class ForgeAgent:
task,
step,
browser_state,
engine,
)
detailed_agent_step_output.scraped_page = scraped_page
detailed_agent_step_output.extract_action_prompt = extract_action_prompt
json_response = None
actions: list[Action]
using_cached_action_plan = False
if not task.navigation_goal and not isinstance(task_block, ValidationBlock):
actions = [await self.create_extract_action(task, step, scraped_page)]
elif (
task_block
and task_block.cache_actions
and (actions := await retrieve_action_plan(task, step, scraped_page))
):
using_cached_action_plan = True
else:
self.async_operation_pool.run_operation(task.task_id, AgentPhase.llm)
json_response = await app.LLM_API_HANDLER(
prompt=extract_action_prompt,
prompt_name="extract-actions",
if engine == RunEngine.openai_cua:
actions, new_cua_response = await self._generate_cua_actions(
task=task,
step=step,
screenshots=scraped_page.screenshots,
scraped_page=scraped_page,
previous_response=cua_response,
)
try:
json_response = await self.handle_potential_verification_code(
task,
step,
scraped_page,
browser_state,
json_response,
detailed_agent_step_output.cua_response = new_cua_response
else:
using_cached_action_plan = False
if not task.navigation_goal and not isinstance(task_block, ValidationBlock):
actions = [await self.create_extract_action(task, step, scraped_page)]
elif (
task_block
and task_block.cache_actions
and (actions := await retrieve_action_plan(task, step, scraped_page))
):
using_cached_action_plan = True
else:
self.async_operation_pool.run_operation(task.task_id, AgentPhase.llm)
json_response = await app.LLM_API_HANDLER(
prompt=extract_action_prompt,
prompt_name="extract-actions",
step=step,
screenshots=scraped_page.screenshots,
)
detailed_agent_step_output.llm_response = json_response
actions = parse_actions(task, step.step_id, step.order, scraped_page, json_response["actions"])
except NoTOTPVerificationCodeFound:
actions = [
TerminateAction(
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=0,
reasoning="No TOTP verification code found. Going to terminate.",
intention="No TOTP verification code found. Going to terminate.",
try:
json_response = await self.handle_potential_verification_code(
task,
step,
scraped_page,
browser_state,
json_response,
)
]
detailed_agent_step_output.llm_response = json_response
actions = parse_actions(task, step.step_id, step.order, scraped_page, json_response["actions"])
except NoTOTPVerificationCodeFound:
actions = [
TerminateAction(
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=0,
reasoning="No TOTP verification code found. Going to terminate.",
intention="No TOTP verification code found. Going to terminate.",
)
]
detailed_agent_step_output.actions = actions
if len(actions) == 0:
@@ -1187,6 +1215,77 @@ class ForgeAgent:
)
return failed_step, detailed_agent_step_output.get_clean_detailed_output()
async def _generate_cua_actions(
self,
task: Task,
step: Step,
scraped_page: ScrapedPage,
previous_response: OpenAIResponse | None = None,
) -> tuple[list[Action], OpenAIResponse]:
if not previous_response:
# this is the first step
first_response: OpenAIResponse = await app.OPENAI_CLIENT.responses.create(
model="computer-use-preview",
tools=[
{
"type": "computer_use_preview",
"display_width": settings.BROWSER_WIDTH,
"display_height": settings.BROWSER_HEIGHT,
"environment": "browser",
}
],
input=[
{
"role": "user",
"content": task.navigation_goal,
}
],
reasoning={
"generate_summary": "concise",
},
truncation="auto",
)
previous_response = first_response
computer_calls = [item for item in previous_response.output if item.type == "computer_call"]
if not computer_calls:
return [], previous_response
if not scraped_page.screenshots:
return [], previous_response
last_call_id = computer_calls[-1].call_id
screenshot_base64 = base64.b64encode(scraped_page.screenshots[0]).decode("utf-8")
current_response = await app.OPENAI_CLIENT.responses.create(
model="computer-use-preview",
previous_response_id=previous_response.id,
tools=[
{
"type": "computer_use_preview",
"display_width": settings.BROWSER_WIDTH,
"display_height": settings.BROWSER_HEIGHT,
"environment": "browser",
}
],
input=[
{
"call_id": last_call_id,
"type": "computer_call_output",
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_base64}",
},
}
],
reasoning={
"generate_summary": "concise",
},
truncation="auto",
)
return parse_cua_actions(task, step, current_response), current_response
@staticmethod
async def complete_verify(page: Page, scraped_page: ScrapedPage, task: Task, step: Step) -> CompleteVerifyResult:
LOG.info(
@@ -1195,7 +1294,12 @@ class ForgeAgent:
step_id=step.step_id,
workflow_run_id=task.workflow_run_id,
)
scraped_page_refreshed = await scraped_page.refresh(draw_boxes=False)
run_obj = await app.DATABASE.get_run(run_id=task.task_id, organization_id=task.organization_id)
scroll = True
if run_obj and run_obj.task_run_type == RunType.openai_cua:
scroll = False
scraped_page_refreshed = await scraped_page.refresh(draw_boxes=False, scroll=scroll)
verification_prompt = load_prompt_with_elements(
scraped_page=scraped_page_refreshed,
@@ -1351,6 +1455,7 @@ class ForgeAgent:
step: Step,
browser_state: BrowserState,
scrape_type: ScrapeType,
engine: RunEngine,
) -> ScrapedPage:
if scrape_type == ScrapeType.NORMAL:
pass
@@ -1370,11 +1475,21 @@ class ForgeAgent:
)
await browser_state.reload_page()
max_screenshot_number = settings.MAX_NUM_SCREENSHOTS
draw_boxes = True
scroll = True
if engine == RunEngine.openai_cua:
max_screenshot_number = 1
draw_boxes = False
scroll = False
return await scrape_website(
browser_state,
task.url,
app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step),
scrape_exclude=app.scrape_exclude,
max_screenshot_number=max_screenshot_number,
draw_boxes=draw_boxes,
scroll=scroll,
)
async def build_and_record_step_prompt(
@@ -1382,6 +1497,7 @@ class ForgeAgent:
task: Task,
step: Step,
browser_state: BrowserState,
engine: RunEngine,
) -> tuple[ScrapedPage, str]:
# start the async tasks while running scrape_website
self.async_operation_pool.run_operation(task.task_id, AgentPhase.scrape)
@@ -1399,6 +1515,7 @@ class ForgeAgent:
step=step,
browser_state=browser_state,
scrape_type=scrape_type,
engine=engine,
)
break
except (FailedToTakeScreenshot, ScrapingFailed) as e:
@@ -1431,14 +1548,16 @@ class ForgeAgent:
# TODO: we only use HTML element for now, introduce a way to switch in the future
element_tree_format = ElementTreeFormat.HTML
element_tree_in_prompt: str = scraped_page.build_element_tree(element_tree_format)
extract_action_prompt = await self._build_extract_action_prompt(
task,
step,
browser_state,
scraped_page,
verification_code_check=bool(task.totp_verification_url or task.totp_identifier),
expire_verification_code=True,
)
extract_action_prompt = ""
if engine != RunEngine.openai_cua:
extract_action_prompt = await self._build_extract_action_prompt(
task,
step,
browser_state,
scraped_page,
verification_code_check=bool(task.totp_verification_url or task.totp_identifier),
expire_verification_code=True,
)
await app.ARTIFACT_MANAGER.create_artifact(
step=step,
@@ -2146,9 +2265,14 @@ class ForgeAgent:
step_result["actions_result"] = action_result_summary
steps_results.append(step_result)
run_obj = await app.DATABASE.get_run(run_id=task.task_id, organization_id=task.organization_id)
scroll = True
if run_obj and run_obj.task_run_type == RunType.openai_cua:
scroll = False
screenshots: list[bytes] = []
if page is not None:
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=page.url)
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=page.url, scroll=scroll)
prompt = prompt_engine.load_prompt(
"summarize-max-steps-reason",