SDK: support actions skeleton (#3817)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Stanislav Novosad
2025-10-29 11:54:57 -06:00
committed by GitHub
parent d2d7b8e4b0
commit 33ad4cfcd1
26 changed files with 2274 additions and 426 deletions

View File

@@ -5,6 +5,7 @@ from skyvern.forge.sdk.routes import debug_sessions # noqa: F401
from skyvern.forge.sdk.routes import pylon # noqa: F401
from skyvern.forge.sdk.routes import run_blocks # noqa: F401
from skyvern.forge.sdk.routes import scripts # noqa: F401
from skyvern.forge.sdk.routes import sdk # noqa: F401
from skyvern.forge.sdk.routes import streaming # noqa: F401
from skyvern.forge.sdk.routes import streaming_messages # noqa: F401
from skyvern.forge.sdk.routes import streaming_vnc # noqa: F401

View File

@@ -0,0 +1,180 @@
import json
import structlog
from fastapi import Depends, HTTPException, status
from skyvern import SkyvernPage
from skyvern.core.script_generations.real_skyvern_page_ai import RealSkyvernPageAi
from skyvern.forge import app
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.routes.routers import base_router
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.sdk_actions import (
RunSdkActionRequest,
RunSdkActionResponse,
)
from skyvern.forge.sdk.services import org_auth_service
from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRequestBody,
WorkflowRunStatus,
)
from skyvern.schemas.workflows import BlockType, WorkflowStatus
LOG = structlog.get_logger()
@base_router.post(
"/sdk/run_action",
response_model=RunSdkActionResponse,
summary="Run an SDK action",
description="Execute a single SDK action with the specified parameters",
tags=["SDK"],
openapi_extra={
"x-fern-sdk-method-name": "run_sdk_action",
},
)
@base_router.post("/sdk/run_action/", include_in_schema=False)
async def run_sdk_action(
action_request: RunSdkActionRequest,
organization: Organization = Depends(org_auth_service.get_current_org),
) -> RunSdkActionResponse:
"""Execute a single SDK action with the specified parameters."""
LOG.info(
"Running SDK action",
organization_id=organization.organization_id,
action_type=action_request.action.type,
)
organization_id = organization.organization_id
browser_session_id = action_request.browser_session_id
browser_address = action_request.browser_address
action = action_request.action
# Use existing workflow_run_id if provided, otherwise create a new one
if action_request.workflow_run_id:
workflow_run = await app.DATABASE.get_workflow_run(
workflow_run_id=action_request.workflow_run_id,
organization_id=organization_id,
)
if not workflow_run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow run {action_request.workflow_run_id} not found",
)
workflow = await app.DATABASE.get_workflow(
workflow_id=workflow_run.workflow_id,
organization_id=organization_id,
)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow {workflow_run.workflow_id} not found",
)
else:
workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(
organization,
title="SDK Workflow",
status=WorkflowStatus.auto_generated,
)
workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
request_id=None,
workflow_request=WorkflowRequestBody(
browser_session_id=browser_session_id,
browser_address=browser_address,
),
workflow_permanent_id=workflow.workflow_permanent_id,
organization=organization,
version=None,
)
workflow_run = await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run.workflow_run_id,
status=WorkflowRunStatus.completed,
)
task = await app.DATABASE.create_task(
organization_id=organization_id,
url=action_request.url,
navigation_goal=None,
navigation_payload=None,
data_extraction_goal=None,
title=f"SDK Action Task: {action_request.action.type}",
workflow_run_id=workflow_run.workflow_run_id,
browser_session_id=browser_session_id,
browser_address=browser_address,
)
step = await app.DATABASE.create_step(
task.task_id,
order=0,
retry_index=0,
organization_id=organization.organization_id,
)
await app.DATABASE.create_workflow_run_block(
workflow_run_id=workflow_run.workflow_run_id,
organization_id=organization_id,
block_type=BlockType.ACTION,
task_id=task.task_id,
)
context = skyvern_context.ensure_context()
skyvern_context.set(
SkyvernContext(
request_id=context.request_id,
organization_id=task.organization_id,
task_id=task.task_id,
step_id=step.step_id,
browser_session_id=browser_session_id,
max_screenshot_scrolls=task.max_screenshot_scrolls,
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
)
)
result = None
try:
scraped_page = await SkyvernPage.create_scraped_page(browser_session_id=browser_session_id)
page = await scraped_page._browser_state.must_get_working_page()
page_ai = RealSkyvernPageAi(scraped_page, page)
if action.type == "ai_click":
result = await page_ai.ai_click(
selector=action.selector,
intention=action.intention,
data=action.data,
timeout=action.timeout,
)
elif action.type == "ai_input_text":
result = await page_ai.ai_input_text(
selector=action.selector,
value=action.value,
intention=action.intention,
data=action.data,
totp_identifier=action.totp_identifier,
totp_url=action.totp_url,
timeout=action.timeout,
)
elif action.type == "ai_select_option":
result = await page_ai.ai_select_option(
selector=action.selector,
value=action.value,
intention=action.intention,
data=action.data,
timeout=action.timeout,
)
elif action.type == "extract":
extract_result = await page_ai.ai_extract(
prompt=action.prompt,
schema=action.extract_schema,
error_code_mapping=action.error_code_mapping,
intention=action.intention,
data=action.data,
)
result = json.dumps(extract_result)
finally:
skyvern_context.reset()
return RunSdkActionResponse(
workflow_run_id=workflow_run.workflow_run_id,
result=result,
)

View File

@@ -0,0 +1,100 @@
from enum import Enum
from typing import Annotated, Any, Literal, Union
from pydantic import BaseModel, Field
from skyvern.config import settings
class SdkActionType(str, Enum):
"""Enum for SDK action types that can be executed."""
AI_CLICK = "ai_click"
AI_INPUT_TEXT = "ai_input_text"
AI_SELECT_OPTION = "ai_select_option"
EXTRACT = "extract"
# Base action class
class SdkActionBase(BaseModel):
"""Base class for SDK actions."""
type: str = Field(..., description="The type of action")
# Specific action types
class ClickAction(SdkActionBase):
"""Click action parameters."""
type: Literal["ai_click"] = "ai_click"
selector: str = Field(default="", description="CSS selector for the element")
intention: str = Field(default="", description="The intention or goal of the click")
data: str | dict[str, Any] | None = Field(None, description="Additional context data")
timeout: float = Field(default=settings.BROWSER_ACTION_TIMEOUT_MS, description="Timeout in milliseconds")
class InputTextAction(SdkActionBase):
"""Input text action parameters."""
type: Literal["ai_input_text"] = "ai_input_text"
selector: str = Field(default="", description="CSS selector for the element")
value: str = Field(default="", description="Value to input")
intention: str = Field(default="", description="The intention or goal of the input")
data: str | dict[str, Any] | None = Field(None, description="Additional context data")
totp_identifier: str | None = Field(None, description="TOTP identifier for input_text actions")
totp_url: str | None = Field(None, description="TOTP URL for input_text actions")
timeout: float = Field(default=settings.BROWSER_ACTION_TIMEOUT_MS, description="Timeout in milliseconds")
class SelectOptionAction(SdkActionBase):
"""Select option action parameters."""
type: Literal["ai_select_option"] = "ai_select_option"
selector: str = Field(default="", description="CSS selector for the element")
value: str = Field(default="", description="Value to select")
intention: str = Field(default="", description="The intention or goal of the selection")
data: str | dict[str, Any] | None = Field(None, description="Additional context data")
timeout: float = Field(default=settings.BROWSER_ACTION_TIMEOUT_MS, description="Timeout in milliseconds")
class ExtractAction(SdkActionBase):
"""Extract data action parameters."""
type: Literal["extract"] = "extract"
prompt: str = Field(default="", description="Extraction prompt")
extract_schema: dict[str, Any] | list | str | None = Field(None, description="Schema for extraction")
error_code_mapping: dict[str, str] | None = Field(None, description="Error code mapping for extraction")
intention: str | None = Field(None, description="The intention or goal of the extraction")
data: str | dict[str, Any] | None = Field(None, description="Additional context data")
# Discriminated union of all action types
SdkAction = Annotated[
Union[ClickAction, InputTextAction, SelectOptionAction, ExtractAction],
Field(discriminator="type"),
]
class RunActionResponse(BaseModel):
"""Response from running an action."""
workflow_run_id: str = Field(..., description="The workflow run ID used for this action")
class RunSdkActionRequest(BaseModel):
"""Request to run a single SDK action."""
url: str = Field(..., description="The URL where the action should be executed")
browser_session_id: str | None = Field(None, description="The browser session ID")
browser_address: str | None = Field(None, description="The browser address")
workflow_run_id: str | None = Field(
None, description="Optional workflow run ID to continue an existing workflow run"
)
action: SdkAction = Field(..., description="The action to execute with its specific parameters")
class RunSdkActionResponse(BaseModel):
"""Response from running an SDK action."""
workflow_run_id: str = Field(..., description="The workflow run ID used for this action")
result: Any | None = Field(None, description="The result from the action (e.g., selector, value, extracted data)")