new API to the workflow run page (#1400)

This commit is contained in:
Shuchang Zheng
2024-12-17 17:17:18 -08:00
committed by GitHub
parent b8e2527ea0
commit 58413db172
8 changed files with 403 additions and 178 deletions

View File

@@ -0,0 +1,31 @@
"""add index to ObserverThoughtModel db table
Revision ID: cf45479f484c
Revises: 411dd89f3df9
Create Date: 2024-12-17 06:51:04.086890+00:00
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "cf45479f484c"
down_revision: Union[str, None] = "411dd89f3df9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"observer_cruise_index", "observer_thoughts", ["organization_id", "observer_cruise_id"], unique=False
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("observer_cruise_index", table_name="observer_thoughts")
# ### end Alembic commands ###

View File

@@ -65,11 +65,11 @@ from skyvern.webeye.actions.actions import (
DecisiveAction,
UserDefinedError,
WebAction,
parse_actions,
)
from skyvern.webeye.actions.caching import retrieve_action_plan
from skyvern.webeye.actions.handler import ActionHandler, poll_verification_code
from skyvern.webeye.actions.models import AgentStepOutput, DetailedAgentStepOutput
from skyvern.webeye.actions.parse_actions import parse_actions
from skyvern.webeye.actions.responses import ActionResult
from skyvern.webeye.browser_factory import BrowserState
from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website

View File

@@ -326,6 +326,25 @@ class AgentDB:
LOG.error("UnexpectedError", exc_info=True)
raise
async def get_tasks_actions(self, task_ids: list[str], organization_id: str | None = None) -> list[Action]:
try:
async with self.Session() as session:
query = (
select(ActionModel)
.filter(ActionModel.organization_id == organization_id)
.filter(ActionModel.task_id.in_(task_ids))
.order_by(ActionModel.created_at)
)
actions = (await session.scalars(query)).all()
return [Action.model_validate(action) for action in actions]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def get_first_step(self, task_id: str, organization_id: str | None = None) -> Step | None:
try:
async with self.Session() as session:
@@ -1858,6 +1877,22 @@ class AgentDB:
return ObserverThought.model_validate(observer_thought)
return None
async def get_observer_cruise_thoughts(
self,
observer_cruise_id: str,
organization_id: str | None = None,
) -> list[ObserverThought]:
async with self.Session() as session:
observer_thoughts = (
await session.scalars(
select(ObserverThoughtModel)
.filter_by(observer_cruise_id=observer_cruise_id)
.filter_by(organization_id=organization_id)
.order_by(ObserverThoughtModel.created_at)
)
).all()
return [ObserverThought.model_validate(thought) for thought in observer_thoughts]
async def create_observer_cruise(
self,
workflow_run_id: str | None = None,

View File

@@ -526,6 +526,7 @@ class ObserverCruiseModel(Base):
class ObserverThoughtModel(Base):
__tablename__ = "observer_thoughts"
__table_args__ = (Index("observer_cruise_index", "organization_id", "observer_cruise_id"),)
observer_thought_id = Column(String, primary_key=True, default=generate_observer_thought_id)
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True)

View File

@@ -33,7 +33,7 @@ from skyvern.forge.sdk.artifact.models import Artifact
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.permissions.permission_checker_factory import PermissionCheckerFactory
from skyvern.forge.sdk.core.security import generate_skyvern_signature
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType, TaskType
from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.organizations import (
@@ -52,12 +52,14 @@ from skyvern.forge.sdk.schemas.tasks import (
TaskResponse,
TaskStatus,
)
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunEvent, WorkflowRunEventType
from skyvern.forge.sdk.services import org_auth_service
from skyvern.forge.sdk.workflow.exceptions import (
FailedToCreateWorkflow,
FailedToUpdateWorkflow,
WorkflowParameterMissingRequiredValue,
)
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.forge.sdk.workflow.models.workflow import (
RunWorkflowResponse,
Workflow,
@@ -720,6 +722,91 @@ async def get_workflow_run(
)
@base_router.get(
"/workflows/{workflow_id}/runs/{workflow_run_id}/events",
)
@base_router.get(
"/workflows/{workflow_id}/runs/{workflow_run_id}/events/",
)
async def get_workflow_run_events(
workflow_id: str,
workflow_run_id: str,
observer_cruise_id: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRunEvent]:
# get all the tasks for the workflow run
tasks = await app.DATABASE.get_tasks(
page,
page_size,
workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id,
)
workflow_run_events: list[WorkflowRunEvent] = []
for task in tasks:
block_type = BlockType.TASK
if task.task_type == TaskType.general:
if not task.navigation_goal and task.data_extraction_goal:
block_type = BlockType.EXTRACTION
elif task.navigation_goal and not task.data_extraction_goal:
block_type = BlockType.NAVIGATION
elif task.task_type == TaskType.validation:
block_type = BlockType.VALIDATION
elif task.task_type == TaskType.action:
block_type = BlockType.ACTION
event = WorkflowRunEvent(
type=WorkflowRunEventType.block,
block=WorkflowRunBlock(
workflow_run_id=workflow_run_id,
block_type=block_type,
label=task.title,
title=task.title,
url=task.url,
status=task.status,
navigation_goal=task.navigation_goal,
data_extraction_goal=task.data_extraction_goal,
data_schema=task.extracted_information_schema,
terminate_criterion=task.terminate_criterion,
complete_criterion=task.complete_criterion,
created_at=task.created_at,
modified_at=task.modified_at,
),
created_at=task.created_at,
modified_at=task.modified_at,
)
workflow_run_events.append(event)
# get all the actions for all the tasks
actions = await app.DATABASE.get_tasks_actions(
[task.task_id for task in tasks], organization_id=current_org.organization_id
)
for action in actions:
workflow_run_events.append(
WorkflowRunEvent(
type=WorkflowRunEventType.action,
action=action,
created_at=action.created_at or datetime.datetime.utcnow(),
modified_at=action.modified_at or datetime.datetime.utcnow(),
)
)
# get all the thoughts for the cruise
if observer_cruise_id:
thoughts = await app.DATABASE.get_observer_cruise_thoughts(
observer_cruise_id, organization_id=current_org.organization_id
)
for thought in thoughts:
workflow_run_events.append(
WorkflowRunEvent(
type=WorkflowRunEventType.thought,
thought=thought,
created_at=thought.created_at,
modified_at=thought.modified_at,
)
)
workflow_run_events.sort(key=lambda x: x.created_at)
return workflow_run_events
@base_router.get(
"/workflows/runs/{workflow_run_id}",
response_model=WorkflowRunStatusResponse,

View File

@@ -0,0 +1,45 @@
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel
from skyvern.forge.sdk.schemas.observers import ObserverThought
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.webeye.actions.actions import Action
class WorkflowRunBlock(BaseModel):
workflow_run_block_id: str = "placeholder"
workflow_run_id: str
parent_workflow_run_block_id: str | None = None
block_type: BlockType
label: str | None = None
title: str | None = None
status: str | None = None
output: dict | list | str | None = None
continue_on_failure: bool = False
task_id: str | None = None
url: str | None = None
navigation_goal: str | None = None
data_extraction_goal: str | None = None
data_schema: dict[str, Any] | list | str | None = None
terminate_criterion: str | None = None
complete_criterion: str | None = None
created_at: datetime
modified_at: datetime
class WorkflowRunEventType(StrEnum):
action = "action"
thought = "thought"
block = "block"
class WorkflowRunEvent(BaseModel):
type: WorkflowRunEventType
action: Action | None = None
thought: ObserverThought | None = None
block: WorkflowRunBlock | None = None
created_at: datetime
modified_at: datetime

View File

@@ -1,13 +1,10 @@
from datetime import datetime
from enum import StrEnum
from typing import Annotated, Any, Dict, Type, TypeVar
from typing import Annotated, Any, Type, TypeVar
import structlog
from litellm import ConfigDict
from pydantic import BaseModel, Field, ValidationError
from skyvern.exceptions import UnsupportedActionType
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.webeye.scraper.scraper import ScrapedPage
from pydantic import BaseModel, Field
LOG = structlog.get_logger()
T = TypeVar("T", bound="Action")
@@ -119,6 +116,9 @@ class Action(BaseModel):
option: SelectOption | None = None
is_checked: bool | None = None
created_at: datetime | None = None
modified_at: datetime | None = None
@classmethod
def validate(cls: Type[T], value: Any) -> T:
if isinstance(value, dict):
@@ -239,176 +239,6 @@ class CompleteAction(DecisiveAction):
data_extraction_goal: str | None = None
def parse_action(action: Dict[str, Any], scraped_page: ScrapedPage, data_extraction_goal: str | None = None) -> Action:
if "id" in action:
element_id = action["id"]
elif "element_id" in action:
element_id = action["element_id"]
else:
element_id = None
skyvern_element_hash = scraped_page.id_to_element_hash.get(element_id) if element_id else None
skyvern_element_data = scraped_page.id_to_element_dict.get(element_id) if element_id else None
reasoning = action["reasoning"] if "reasoning" in action else None
confidence_float = action["confidence_float"] if "confidence_float" in action else None
# TODO: currently action intention and response are only used for Q&A actions, like input_text
# When we start supporting click action, intention will be the reasoning for the click action (why take the action)
intention = action["user_detail_query"] if "user_detail_query" in action else None
response = action["user_detail_answer"] if "user_detail_answer" in action else None
base_action_dict = {
"element_id": element_id,
"skyvern_element_hash": skyvern_element_hash,
"skyvern_element_data": skyvern_element_data,
"reasoning": reasoning,
"confidence_float": confidence_float,
"intention": intention,
"response": response,
}
if "action_type" not in action or action["action_type"] is None:
return NullAction(**base_action_dict)
# `.upper()` handles the case where the LLM returns a lowercase action type (e.g. "click" instead of "CLICK")
action_type = ActionType[action["action_type"].upper()]
if not action_type.is_web_action():
# LLM sometimes hallucinates and returns element id for non-web actions such as WAIT, TERMINATE, COMPLETE etc.
# That can sometimes cause cached action plan to be invalidated. This way we're making sure the element id is not
# set for non-web actions.
base_action_dict["element_id"] = None
if action_type == ActionType.TERMINATE:
return TerminateAction(**base_action_dict, errors=action["errors"] if "errors" in action else [])
if action_type == ActionType.CLICK:
file_url = action["file_url"] if "file_url" in action else None
return ClickAction(**base_action_dict, file_url=file_url, download=action.get("download", False))
if action_type == ActionType.INPUT_TEXT:
return InputTextAction(**base_action_dict, text=action["text"])
if action_type == ActionType.UPLOAD_FILE:
# TODO: see if the element is a file input element. if it's not, convert this action into a click action
return UploadFileAction(
**base_action_dict,
file_url=action["file_url"],
)
# This action is not used in the current implementation. Click actions are used instead.
if action_type == ActionType.DOWNLOAD_FILE:
return DownloadFileAction(**base_action_dict, file_name=action["file_name"])
if action_type == ActionType.SELECT_OPTION:
option = action["option"]
if option is None:
raise ValueError("SelectOptionAction requires an 'option' field")
label = option.get("label")
value = option.get("value")
index = option.get("index")
if label is None and value is None and index is None:
raise ValueError("At least one of 'label', 'value', or 'index' must be provided for a SelectOption")
return SelectOptionAction(
**base_action_dict,
option=SelectOption(
label=label,
value=value,
index=index,
),
)
if action_type == ActionType.CHECKBOX:
return CheckboxAction(
**base_action_dict,
is_checked=action["is_checked"],
)
if action_type == ActionType.WAIT:
return WaitAction(**base_action_dict)
if action_type == ActionType.COMPLETE:
return CompleteAction(
**base_action_dict,
data_extraction_goal=data_extraction_goal,
errors=action["errors"] if "errors" in action else [],
)
if action_type == "null":
return NullAction(**base_action_dict)
if action_type == ActionType.SOLVE_CAPTCHA:
return SolveCaptchaAction(**base_action_dict)
raise UnsupportedActionType(action_type=action_type)
def parse_actions(
task: Task, step_id: str, step_order: int, scraped_page: ScrapedPage, json_response: list[Dict[str, Any]]
) -> list[Action]:
actions: list[Action] = []
for idx, action in enumerate(json_response):
try:
action_instance = parse_action(
action=action, scraped_page=scraped_page, data_extraction_goal=task.data_extraction_goal
)
action_instance.organization_id = task.organization_id
action_instance.workflow_run_id = task.workflow_run_id
action_instance.task_id = task.task_id
action_instance.step_id = step_id
action_instance.step_order = step_order
action_instance.action_order = idx
if isinstance(action_instance, TerminateAction):
LOG.warning(
"Agent decided to terminate",
task_id=task.task_id,
llm_response=json_response,
reasoning=action_instance.reasoning,
actions=actions,
)
actions.append(action_instance)
except UnsupportedActionType:
LOG.error(
"Unsupported action type when parsing actions",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
except (ValidationError, ValueError):
LOG.warning(
"Invalid action",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
except Exception:
LOG.error(
"Failed to marshal action",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
############################ This part of code might not be needed ############################
# Reason #1. validation can be done in action handler but not in parser
# Reason #2. no need to validate whether the element_id has a hash.
# If there's no hash, we can fall back to normal operation
all_element_ids = [action.element_id for action in actions if action.element_id]
missing_element_ids = [
element_id for element_id in all_element_ids if element_id not in scraped_page.id_to_element_hash
]
if missing_element_ids:
LOG.warning(
"Missing elements in scraped page",
task_id=task.task_id,
missing_element_ids=missing_element_ids,
all_element_ids=all_element_ids,
)
############################ This part of code might not be needed ############################
return actions
class ScrapeResult(BaseModel):
"""
Scraped response from a webpage, including:

View File

@@ -0,0 +1,196 @@
from typing import Any, Dict
import structlog
from pydantic import ValidationError
from skyvern.exceptions import UnsupportedActionType
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.webeye.actions.actions import (
Action,
ActionType,
CheckboxAction,
ClickAction,
CompleteAction,
DownloadFileAction,
InputTextAction,
NullAction,
SelectOption,
SelectOptionAction,
SolveCaptchaAction,
TerminateAction,
UploadFileAction,
WaitAction,
)
from skyvern.webeye.scraper.scraper import ScrapedPage
LOG = structlog.get_logger()
def parse_action(action: Dict[str, Any], scraped_page: ScrapedPage, data_extraction_goal: str | None = None) -> Action:
if "id" in action:
element_id = action["id"]
elif "element_id" in action:
element_id = action["element_id"]
else:
element_id = None
skyvern_element_hash = scraped_page.id_to_element_hash.get(element_id) if element_id else None
skyvern_element_data = scraped_page.id_to_element_dict.get(element_id) if element_id else None
reasoning = action["reasoning"] if "reasoning" in action else None
confidence_float = action["confidence_float"] if "confidence_float" in action else None
# TODO: currently action intention and response are only used for Q&A actions, like input_text
# When we start supporting click action, intention will be the reasoning for the click action (why take the action)
intention = action["user_detail_query"] if "user_detail_query" in action else None
response = action["user_detail_answer"] if "user_detail_answer" in action else None
base_action_dict = {
"element_id": element_id,
"skyvern_element_hash": skyvern_element_hash,
"skyvern_element_data": skyvern_element_data,
"reasoning": reasoning,
"confidence_float": confidence_float,
"intention": intention,
"response": response,
}
if "action_type" not in action or action["action_type"] is None:
return NullAction(**base_action_dict)
# `.upper()` handles the case where the LLM returns a lowercase action type (e.g. "click" instead of "CLICK")
action_type = ActionType[action["action_type"].upper()]
if not action_type.is_web_action():
# LLM sometimes hallucinates and returns element id for non-web actions such as WAIT, TERMINATE, COMPLETE etc.
# That can sometimes cause cached action plan to be invalidated. This way we're making sure the element id is not
# set for non-web actions.
base_action_dict["element_id"] = None
if action_type == ActionType.TERMINATE:
return TerminateAction(**base_action_dict, errors=action["errors"] if "errors" in action else [])
if action_type == ActionType.CLICK:
file_url = action["file_url"] if "file_url" in action else None
return ClickAction(**base_action_dict, file_url=file_url, download=action.get("download", False))
if action_type == ActionType.INPUT_TEXT:
return InputTextAction(**base_action_dict, text=action["text"])
if action_type == ActionType.UPLOAD_FILE:
# TODO: see if the element is a file input element. if it's not, convert this action into a click action
return UploadFileAction(
**base_action_dict,
file_url=action["file_url"],
)
# This action is not used in the current implementation. Click actions are used instead.
if action_type == ActionType.DOWNLOAD_FILE:
return DownloadFileAction(**base_action_dict, file_name=action["file_name"])
if action_type == ActionType.SELECT_OPTION:
option = action["option"]
if option is None:
raise ValueError("SelectOptionAction requires an 'option' field")
label = option.get("label")
value = option.get("value")
index = option.get("index")
if label is None and value is None and index is None:
raise ValueError("At least one of 'label', 'value', or 'index' must be provided for a SelectOption")
return SelectOptionAction(
**base_action_dict,
option=SelectOption(
label=label,
value=value,
index=index,
),
)
if action_type == ActionType.CHECKBOX:
return CheckboxAction(
**base_action_dict,
is_checked=action["is_checked"],
)
if action_type == ActionType.WAIT:
return WaitAction(**base_action_dict)
if action_type == ActionType.COMPLETE:
return CompleteAction(
**base_action_dict,
data_extraction_goal=data_extraction_goal,
errors=action["errors"] if "errors" in action else [],
)
if action_type == "null":
return NullAction(**base_action_dict)
if action_type == ActionType.SOLVE_CAPTCHA:
return SolveCaptchaAction(**base_action_dict)
raise UnsupportedActionType(action_type=action_type)
def parse_actions(
task: Task, step_id: str, step_order: int, scraped_page: ScrapedPage, json_response: list[Dict[str, Any]]
) -> list[Action]:
actions: list[Action] = []
for idx, action in enumerate(json_response):
try:
action_instance = parse_action(
action=action, scraped_page=scraped_page, data_extraction_goal=task.data_extraction_goal
)
action_instance.organization_id = task.organization_id
action_instance.workflow_run_id = task.workflow_run_id
action_instance.task_id = task.task_id
action_instance.step_id = step_id
action_instance.step_order = step_order
action_instance.action_order = idx
if isinstance(action_instance, TerminateAction):
LOG.warning(
"Agent decided to terminate",
task_id=task.task_id,
llm_response=json_response,
reasoning=action_instance.reasoning,
actions=actions,
)
actions.append(action_instance)
except UnsupportedActionType:
LOG.error(
"Unsupported action type when parsing actions",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
except (ValidationError, ValueError):
LOG.warning(
"Invalid action",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
except Exception:
LOG.error(
"Failed to marshal action",
task_id=task.task_id,
raw_action=action,
exc_info=True,
)
############################ This part of code might not be needed ############################
# Reason #1. validation can be done in action handler but not in parser
# Reason #2. no need to validate whether the element_id has a hash.
# If there's no hash, we can fall back to normal operation
all_element_ids = [action.element_id for action in actions if action.element_id]
missing_element_ids = [
element_id for element_id in all_element_ids if element_id not in scraped_page.id_to_element_hash
]
if missing_element_ids:
LOG.warning(
"Missing elements in scraped page",
task_id=task.task_id,
missing_element_ids=missing_element_ids,
all_element_ids=all_element_ids,
)
############################ This part of code might not be needed ############################
return actions