diff --git a/skyvern/__init__.py b/skyvern/__init__.py index 7ba2c57d..d4ac57e4 100644 --- a/skyvern/__init__.py +++ b/skyvern/__init__.py @@ -46,6 +46,7 @@ from skyvern.services.script_service import ( # noqa: E402 run_task, # noqa: E402 send_email, # noqa: E402 upload_file, # noqa: E402 + validate, # noqa: E402 wait, # noqa: E402 ) # noqa: E402 @@ -72,6 +73,7 @@ __all__ = [ "send_email", "setup", "upload_file", + "validate", "wait", "workflow", ] diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 7c37752f..89712151 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -805,18 +805,62 @@ def _build_send_email_statement(block: dict[str, Any]) -> cst.SimpleStatementLin return cst.SimpleStatementLine([cst.Expr(cst.Await(call))]) -def _build_validate_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: +def _build_validate_statement( + block_title: str, block: dict[str, Any], data_variable_name: str | None = None +) -> cst.SimpleStatementLine: """Build a skyvern.validate statement.""" - args = [ - cst.Arg( - keyword=cst.Name("prompt"), - value=_value(block.get("navigation_goal", "")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ), - ] + args = [] + + # Add complete_criterion if it exists + if block.get("complete_criterion") is not None: + args.append( + cst.Arg( + keyword=cst.Name("complete_criterion"), + value=_value(block.get("complete_criterion")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + + # Add terminate_criterion if it exists + if block.get("terminate_criterion") is not None: + args.append( + cst.Arg( + keyword=cst.Name("terminate_criterion"), + value=_value(block.get("terminate_criterion")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + + # Add error_code_mapping if it exists + if block.get("error_code_mapping") is not None: + args.append( + cst.Arg( + keyword=cst.Name("error_code_mapping"), + value=_value(block.get("error_code_mapping")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + + # Add label if it exists + if block.get("label") is not None: + args.append( + cst.Arg( + keyword=cst.Name("label"), + value=_value(block.get("label")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + ), + ) + ) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("validate")), @@ -1465,6 +1509,8 @@ def _build_block_statement(block: dict[str, Any], data_variable_name: str | None stmt = _build_extract_statement(block_title, block, data_variable_name) elif block_type == "navigation": stmt = _build_navigate_statement(block_title, block, data_variable_name) + elif block_type == "validation": + stmt = _build_validate_statement(block_title, block, data_variable_name) elif block_type == "task_v2": stmt = _build_run_task_statement(block_title, block, data_variable_name) elif block_type == "send_email": diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index d80bc782..d232c433 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -794,3 +794,11 @@ class AzureBaseError(SkyvernException): class AzureConfigurationError(AzureBaseError): def __init__(self, message: str) -> None: super().__init__(f"Error in Azure configuration: {message}") + + +###### Script Exceptions ###### + + +class ScriptTerminationException(SkyvernException): + def __init__(self, reason: str | None = None) -> None: + super().__init__(reason) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index b13acffc..e1b6736f 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -17,6 +17,7 @@ from skyvern.exceptions import ( FailedToSendWebhook, InvalidCredentialId, MissingValueForParameter, + ScriptTerminationException, SkyvernException, WorkflowNotFound, WorkflowRunNotFound, @@ -2522,14 +2523,26 @@ class WorkflowService: parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} # Execute the script using script_service - await script_service.execute_script( - script_id=script_id, - organization_id=organization.organization_id, - parameters=parameters, - workflow_run_id=workflow_run.workflow_run_id, - browser_session_id=browser_session_id, - background_tasks=None, # Execute synchronously - ) + try: + await script_service.execute_script( + script_id=script_id, + organization_id=organization.organization_id, + parameters=parameters, + workflow_run_id=workflow_run.workflow_run_id, + browser_session_id=browser_session_id, + background_tasks=None, # Execute synchronously + ) + except ScriptTerminationException as e: + LOG.info( + "Script terminated, marking workflow run as terminated", + failure_reason=e.message, + workflow_run_id=workflow_run.workflow_run_id, + ) + workflow_run = await self.mark_workflow_run_as_terminated( + workflow_run_id=workflow_run.workflow_run_id, + failure_reason=e.message, + ) + return workflow_run # Mark workflow run as completed workflow_run = await self.mark_workflow_run_as_completed( diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 8634ba5d..13d11c5a 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -18,10 +18,11 @@ from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS from skyvern.core.script_generations.generate_script import _build_block_fn, create_or_update_script_block from skyvern.core.script_generations.skyvern_page import script_run_context_manager -from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound +from skyvern.exceptions import ScriptNotFound, ScriptTerminationException, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.schemas.files import FileInfo from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus @@ -40,6 +41,7 @@ from skyvern.forge.sdk.workflow.models.block import ( TaskBlock, TextPromptBlock, UrlBlock, + ValidationBlock, ) from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter, ParameterType from skyvern.forge.sdk.workflow.models.workflow import Workflow @@ -1323,6 +1325,7 @@ async def action( action_block = ActionBlock( label=block_validation_output.label, output_parameter=block_validation_output.output_parameter, + task_type=TaskType.action, url=url, navigation_goal=prompt, max_steps_per_run=max_steps, @@ -1485,6 +1488,36 @@ async def extract( return block_result.output_parameter_value +async def validate( + complete_criterion: str | None = None, + terminate_criterion: str | None = None, + error_code_mapping: dict[str, str] | None = None, + label: str | None = None, +) -> None: + """Validate function that behaves like a ValidationBlock""" + if not complete_criterion and not terminate_criterion: + raise Exception("Both complete criterion and terminate criterion are empty") + + block_validation_output = await _validate_and_get_output_parameter(label) + validation_block = ValidationBlock( + label=block_validation_output.label, + output_parameter=block_validation_output.output_parameter, + task_type=TaskType.validation, + complete_criterion=complete_criterion, + terminate_criterion=terminate_criterion, + error_code_mapping=error_code_mapping, + max_steps_per_run=2, + ) + result = await validation_block.execute_safe( + workflow_run_id=block_validation_output.workflow_run_id, + parent_workflow_run_block_id=block_validation_output.context.parent_workflow_run_block_id, + organization_id=block_validation_output.organization_id, + browser_session_id=block_validation_output.browser_session_id, + ) + if result.status == BlockStatus.terminated: + raise ScriptTerminationException(result.failure_reason) + + async def wait(seconds: int, label: str | None = None) -> None: # Auto-create workflow block run if workflow_run_id is available (wait block doesn't create tasks) workflow_run_block_id, _, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT, label=label)