support more anthropic actions (#2257)

This commit is contained in:
Shuchang Zheng
2025-04-30 18:42:44 +08:00
committed by GitHub
parent f932d7d704
commit 1a33810f09
4 changed files with 369 additions and 130 deletions

View File

@@ -20,6 +20,7 @@ from skyvern.webeye.actions.actions import (
DragAction,
InputTextAction,
KeypressAction,
LeftMouseAction,
MoveAction,
NullAction,
ScrollAction,
@@ -287,7 +288,8 @@ async def parse_cua_actions(
intention=reasoning,
)
case "move":
reasoning = reasoning or f"Move mouse to: ({cua_action.x}, {cua_action.y})"
response = f"Move mouse to: ({cua_action.x}, {cua_action.y})"
reasoning = reasoning or response
action = MoveAction(
x=cua_action.x,
y=cua_action.y,
@@ -350,7 +352,7 @@ async def parse_cua_actions(
step_id=step.step_id,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
response=response.dict(),
response=response.model_dump(),
)
reasoning = reasonings[0].summary[0].text if reasonings and reasonings[0].summary else None
assistant_message = assistant_messages[0].content[0].text if assistant_messages else None
@@ -366,135 +368,336 @@ async def parse_anthropic_actions(
screenshot_resize_target_dimension: Resolution,
) -> list[Action]:
tool_calls = [block for block in assistant_content if block["type"] == "tool_use" and block["name"] == "computer"]
reasonings = [block for block in assistant_content if block["type"] == "thinking"]
LOG.info("Anthropic tool calls", tool_calls=tool_calls, reasonings=reasonings, assistant_content=assistant_content)
if len(reasonings) > 1:
LOG.warning(
"Anthropic CUA: multiple reasonings in assistant content",
task_id=task.task_id,
step_id=step.step_id,
assistant_content=assistant_content,
)
reasoning = reasonings[0]["thinking"] if reasonings else None
idx = 0
actions: list[Action] = []
LOG.info("Anthropic tool calls", tool_calls=tool_calls, assistant_content=assistant_content)
while idx < len(tool_calls):
tool_call = tool_calls[idx]
tool_call_id = tool_call["id"]
tool_call_input = tool_call.get("input")
if not tool_call_input:
idx += 1
continue
action = tool_call_input["action"]
if action == "mouse_move":
original_x, original_y = tool_call_input["coordinate"]
# (x, y) is the coordinate in resized screenshots. We need to scale it to the browser window dimension.
x, y = scale_coordinates(
(original_x, original_y), screenshot_resize_target_dimension, browser_window_dimension
)
actions.append(
MoveAction(
x=x,
y=y,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
try:
tool_call_id = tool_call["id"]
tool_call_input = tool_call.get("input")
if not tool_call_input:
idx += 1
continue
action = tool_call_input["action"]
if action == "mouse_move":
coordinate = tool_call_input.get("coordinate")
if not coordinate:
LOG.warning(
"Anthropic CUA error: mouse move action has no coordinate",
tool_call=tool_call,
)
idx += 1
continue
# (x, y) is the coordinate in resized screenshots. We need to scale it to the browser window dimension.
x, y = validate_and_get_coordinates(
coordinate, screenshot_resize_target_dimension, browser_window_dimension
)
)
elif action == "left_click":
coordinate = tool_call_input.get("coordinate")
if not coordinate and idx - 1 >= 0:
prev_tool_call = tool_calls[idx - 1]
prev_tool_call_input = prev_tool_call.get("input")
if prev_tool_call_input and prev_tool_call_input["action"] == "mouse_move":
coordinate = prev_tool_call_input.get("coordinate")
response = f"Move mouse to: ({x}, {y})"
reasoning = reasoning or response
actions.append(
# TODO: add response by adding specifying the element to move to
MoveAction(
x=x,
y=y,
reasoning=reasoning,
intention=reasoning,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "left_click":
coordinate = tool_call_input.get("coordinate")
if not coordinate and idx - 1 >= 0:
prev_tool_call = tool_calls[idx - 1]
prev_tool_call_input = prev_tool_call.get("input")
if prev_tool_call_input and prev_tool_call_input["action"] == "mouse_move":
coordinate = prev_tool_call_input.get("coordinate")
if not coordinate:
LOG.warning(
"Left click action has no coordinate and it doesn't have mouse_move before it",
if not coordinate:
LOG.warning(
"Anthropic CUA error: left click action has no coordinate and it doesn't have mouse_move before it",
tool_call=tool_call,
)
idx += 1
continue
x, y = validate_and_get_coordinates(
coordinate, screenshot_resize_target_dimension, browser_window_dimension
)
response = f"Click at: ({x}, {y})"
reasoning = reasoning or response
actions.append(
ClickAction(
element_id="",
x=x,
y=y,
button="left",
reasoning=reasoning,
intention=reasoning,
response=response,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "type":
text = tool_call_input.get("text")
if not text:
LOG.warning(
"Anthropic CUA error: type action has no text",
tool_call=tool_call,
)
idx += 1
continue
actions.append(
InputTextAction(
element_id="",
text=text,
reasoning=reasoning,
intention=reasoning,
response=text,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action in ["key", "hold_key"]:
text = tool_call_input.get("text")
if not text:
LOG.warning(
"Anthropic CUA error: key action has no text",
tool_call=tool_call,
)
idx += 1
continue
response = f"Press keys: {text}"
hold = action == "hold_key"
duration = tool_call_input.get("duration", 0)
if hold:
response = f"Hold keys for {duration} seconds: {text}"
reasoning = reasoning or response
actions.append(
KeypressAction(
element_id="",
keys=[text],
hold=hold,
duration=duration,
reasoning=reasoning,
intention=reasoning,
response=response,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "screenshot":
actions.append(
NullAction(
reasoning=reasoning,
intention=reasoning,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "scroll":
x, y = None, None
coordinate = tool_call_input.get("coordinate")
if coordinate:
x, y = validate_and_get_coordinates(
coordinate, browser_window_dimension, screenshot_resize_target_dimension
)
scroll_direction = tool_call_input.get("scroll_direction")
scroll_amount = tool_call_input.get("scroll_amount")
if scroll_direction == "up":
scroll_x = 0
scroll_y = -scroll_amount
elif scroll_direction == "down":
scroll_x = 0
scroll_y = scroll_amount
elif scroll_direction == "left":
scroll_x = -scroll_amount
scroll_y = 0
elif scroll_direction == "right":
scroll_x = scroll_amount
scroll_y = 0
else:
LOG.warning(
"Anthropic CUA error: unsupported scroll direction",
tool_call=tool_call,
)
idx += 1
continue
response = f"Scroll by: ({scroll_x}, {scroll_y})"
reasoning = reasoning or response
actions.append(
ScrollAction(
element_id="",
x=x,
y=y,
scroll_x=scroll_x,
scroll_y=scroll_y,
reasoning=reasoning,
intention=reasoning,
response=response,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action in ["left_mouse_down", "left_mouse_up"]:
coordinate = tool_call_input.get("coordinate")
x, y = None, None
if coordinate:
x, y = validate_and_get_coordinates(
coordinate, browser_window_dimension, screenshot_resize_target_dimension
)
direction = "down" if action == "left_mouse_down" else "up"
response = f"Left mouse {direction} at: ({x}, {y})"
reasoning = reasoning or response
actions.append(
LeftMouseAction(
x=x,
y=y,
direction=direction,
reasoning=reasoning,
intention=reasoning,
response=response,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "left_click_drag":
coordinate = tool_call_input.get("coordinate")
start_coordinate = tool_call_input.get("start_coordinate")
LOG.info(
"Anthropic CUA left click drag action", coordinate=coordinate, start_coordinate=start_coordinate
)
if not coordinate or not start_coordinate:
LOG.warning(
"Anthropic CUA error: left click drag action has no coordinate or start coordinate",
tool_call=tool_call,
)
idx += 1
continue
x, y = validate_and_get_coordinates(
coordinate, browser_window_dimension, screenshot_resize_target_dimension
)
start_x, start_y = validate_and_get_coordinates(
start_coordinate, browser_window_dimension, screenshot_resize_target_dimension
)
response = f"Drag from ({start_x}, {start_y}) to ({x}, {y})"
reasoning = reasoning or response
actions.append(
DragAction(
start_x=start_x,
start_y=start_y,
path=[(x, y)],
reasoning=reasoning,
intention=reasoning,
response=response,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "wait":
duration = tool_call_input.get("duration", 5)
actions.append(
WaitAction(
seconds=duration,
reasoning=reasoning,
intention=reasoning,
response=f"Wait for {duration} seconds",
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
else:
LOG.error(
"Anthropic CUA error: unsupported action",
tool_call=tool_call,
)
idx += 1
continue
original_x, original_y = coordinate
x, y = scale_coordinates(
(original_x, original_y), screenshot_resize_target_dimension, browser_window_dimension
)
actions.append(
ClickAction(
element_id="",
x=x,
y=y,
button="left",
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "type":
text = tool_call_input.get("text")
if not text:
LOG.warning(
"Anthropic type action has no text",
tool_call=tool_call,
)
idx += 1
continue
actions.append(
InputTextAction(
element_id="",
text=text,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "key":
text = tool_call_input.get("text")
if not text:
LOG.warning(
"Key action has no text",
tool_call=tool_call,
)
idx += 1
continue
actions.append(
KeypressAction(
element_id="",
keys=[text],
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
elif action == "screenshot":
actions.append(
NullAction(
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
task_id=task.task_id,
step_id=step.step_id,
step_order=step.order,
action_order=idx,
tool_call_id=tool_call_id,
)
)
else:
LOG.error(
"Unsupported action",
idx += 1
except Exception:
LOG.exception(
"Anthropic CUA error: failed to parse action",
task_id=task.task_id,
step_id=step.step_id,
tool_call=tool_call,
)
idx += 1
break
if not actions:
reasoning = reasonings[0]["thinking"] if reasonings else None
assistant_messages = [block for block in assistant_content if block["type"] == "text"]
assistant_message = assistant_messages[0]["text"] if assistant_messages else None
actions = await generate_cua_fallback_actions(task, step, assistant_message, reasoning)
return actions
# function from anthropic's quickstart guide
# https://github.com/anthropics/anthropic-quickstarts/blob/81c4085944abb1734db411f05290b538fdc46dcd/computer-use-demo/computer_use_demo/tools/computer.py#L214C1-L221C1
def validate_and_get_coordinates(
coordinate: tuple[int, int] | list[int],
current_dimension: Resolution,
target_dimension: Resolution,
) -> tuple[int, int]:
if len(coordinate) != 2:
raise ValueError(f"{coordinate} must be a tuple of length 2")
if not all(isinstance(i, int) and i >= 0 for i in coordinate):
raise ValueError(f"{coordinate} must be a tuple of non-negative ints")
return scale_coordinates((coordinate[0], coordinate[1]), current_dimension, target_dimension)
async def generate_cua_fallback_actions(
task: Task,
step: Step,