From 284fba0abaa038b3765bd417495f374c6c2eefc2 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Mon, 25 Nov 2024 10:42:34 +0800 Subject: [PATCH] add wati block (#1254) --- skyvern/config.py | 1 + skyvern/forge/sdk/workflow/exceptions.py | 5 ++++ skyvern/forge/sdk/workflow/models/block.py | 31 ++++++++++++++++++++++ skyvern/forge/sdk/workflow/models/yaml.py | 6 +++++ skyvern/forge/sdk/workflow/service.py | 17 ++++++++++++ 5 files changed, 60 insertions(+) diff --git a/skyvern/config.py b/skyvern/config.py index 0646a5b6..72ade145 100644 --- a/skyvern/config.py +++ b/skyvern/config.py @@ -72,6 +72,7 @@ class Settings(BaseSettings): # Workflow constant parameters WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY" + WORKFLOW_WAIT_BLOCK_MAX_SEC: int = 30 * 60 # streaming settings STREAMING_FILE_BASE_PATH: str = "/tmp" diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index a49d6fd1..99eafd44 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -114,3 +114,8 @@ class FailedToParseActionInstruction(SkyvernException): super().__init__( f"Failed to parse the action instruction as '{reason}({error_type})'", ) + + +class InvalidWaitBlockTime(SkyvernException): + def __init__(self, max_sec: int): + super().__init__(f"Invalid wait time for wait block, it should be a number between 0 and {max_sec}.") diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 6027aec9..d7841161 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -1,4 +1,5 @@ import abc +import asyncio import csv import json import os @@ -77,6 +78,7 @@ class BlockType(StrEnum): NAVIGATION = "navigation" EXTRACTION = "extraction" LOGIN = "login" + WAIT = "wait" class BlockStatus(StrEnum): @@ -1277,6 +1279,34 @@ class FileParserBlock(Block): ) +class WaitBlock(Block): + block_type: Literal[BlockType.WAIT] = BlockType.WAIT + + wait_sec: int + parameters: list[PARAMETER_TYPE] = [] + + def get_all_parameters( + self, + workflow_run_id: str, + ) -> list[PARAMETER_TYPE]: + return self.parameters + + async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: + # TODO: we need to support to interrupt the sleep when the workflow run failed/cancelled/terminated + LOG.info( + "Going to pause the workflow for a while", + second=self.wait_sec, + workflow_run_id=workflow_run_id, + ) + await asyncio.sleep(self.wait_sec) + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + result_dict = {"success": True} + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=result_dict, status=BlockStatus.completed + ) + + class ValidationBlock(BaseTaskBlock): block_type: Literal[BlockType.VALIDATION] = BlockType.VALIDATION @@ -1365,5 +1395,6 @@ BlockSubclasses = Union[ NavigationBlock, ExtractionBlock, LoginBlock, + WaitBlock, ] BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index dc77b0a9..ac3b13c4 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -278,6 +278,11 @@ class LoginBlockYAML(BlockYAML): cache_actions: bool = False +class WaitBlockYAML(BlockYAML): + block_type: Literal[BlockType.WAIT] = BlockType.WAIT # type: ignore + wait_sec: int = 0 + + PARAMETER_YAML_SUBCLASSES = ( AWSSecretParameterYAML | BitwardenLoginCredentialParameterYAML @@ -303,6 +308,7 @@ BLOCK_YAML_SUBCLASSES = ( | NavigationBlockYAML | ExtractionBlockYAML | LoginBlockYAML + | WaitBlockYAML ) BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index cfb32b02..b7317876 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -21,8 +21,10 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.enums import TaskPromptTemplate from skyvern.forge.sdk.models import Organization, Step from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task +from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.exceptions import ( ContextParameterSourceNotDefined, + InvalidWaitBlockTime, InvalidWorkflowDefinition, WorkflowDefinitionHasDuplicateParameterKeys, WorkflowDefinitionHasReservedParameterKeys, @@ -45,6 +47,7 @@ from skyvern.forge.sdk.workflow.models.block import ( TextPromptBlock, UploadToS3Block, ValidationBlock, + WaitBlock, ) from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, @@ -1456,4 +1459,18 @@ class WorkflowService: cache_actions=block_yaml.cache_actions, ) + elif block_yaml.block_type == BlockType.WAIT: + if ( + block_yaml.wait_sec <= 0 + or block_yaml.wait_sec > SettingsManager.get_settings().WORKFLOW_WAIT_BLOCK_MAX_SEC + ): + raise InvalidWaitBlockTime(SettingsManager.get_settings().WORKFLOW_WAIT_BLOCK_MAX_SEC) + + return WaitBlock( + label=block_yaml.label, + wait_sec=block_yaml.wait_sec, + continue_on_failure=block_yaml.continue_on_failure, + output_parameter=output_parameter, + ) + raise ValueError(f"Invalid block type {block_yaml.block_type}")