add wati block (#1254)

This commit is contained in:
LawyZheng
2024-11-25 10:42:34 +08:00
committed by GitHub
parent 303153b305
commit 284fba0aba
5 changed files with 60 additions and 0 deletions

View File

@@ -72,6 +72,7 @@ class Settings(BaseSettings):
# Workflow constant parameters # Workflow constant parameters
WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY" WORKFLOW_DOWNLOAD_DIRECTORY_PARAMETER_KEY: str = "SKYVERN_DOWNLOAD_DIRECTORY"
WORKFLOW_WAIT_BLOCK_MAX_SEC: int = 30 * 60
# streaming settings # streaming settings
STREAMING_FILE_BASE_PATH: str = "/tmp" STREAMING_FILE_BASE_PATH: str = "/tmp"

View File

@@ -114,3 +114,8 @@ class FailedToParseActionInstruction(SkyvernException):
super().__init__( super().__init__(
f"Failed to parse the action instruction as '{reason}({error_type})'", f"Failed to parse the action instruction as '{reason}({error_type})'",
) )
class InvalidWaitBlockTime(SkyvernException):
def __init__(self, max_sec: int):
super().__init__(f"Invalid wait time for wait block, it should be a number between 0 and {max_sec}.")

View File

@@ -1,4 +1,5 @@
import abc import abc
import asyncio
import csv import csv
import json import json
import os import os
@@ -77,6 +78,7 @@ class BlockType(StrEnum):
NAVIGATION = "navigation" NAVIGATION = "navigation"
EXTRACTION = "extraction" EXTRACTION = "extraction"
LOGIN = "login" LOGIN = "login"
WAIT = "wait"
class BlockStatus(StrEnum): class BlockStatus(StrEnum):
@@ -1277,6 +1279,34 @@ class FileParserBlock(Block):
) )
class WaitBlock(Block):
block_type: Literal[BlockType.WAIT] = BlockType.WAIT
wait_sec: int
parameters: list[PARAMETER_TYPE] = []
def get_all_parameters(
self,
workflow_run_id: str,
) -> list[PARAMETER_TYPE]:
return self.parameters
async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
# TODO: we need to support to interrupt the sleep when the workflow run failed/cancelled/terminated
LOG.info(
"Going to pause the workflow for a while",
second=self.wait_sec,
workflow_run_id=workflow_run_id,
)
await asyncio.sleep(self.wait_sec)
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
result_dict = {"success": True}
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
return self.build_block_result(
success=True, failure_reason=None, output_parameter_value=result_dict, status=BlockStatus.completed
)
class ValidationBlock(BaseTaskBlock): class ValidationBlock(BaseTaskBlock):
block_type: Literal[BlockType.VALIDATION] = BlockType.VALIDATION block_type: Literal[BlockType.VALIDATION] = BlockType.VALIDATION
@@ -1365,5 +1395,6 @@ BlockSubclasses = Union[
NavigationBlock, NavigationBlock,
ExtractionBlock, ExtractionBlock,
LoginBlock, LoginBlock,
WaitBlock,
] ]
BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")] BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")]

View File

@@ -278,6 +278,11 @@ class LoginBlockYAML(BlockYAML):
cache_actions: bool = False cache_actions: bool = False
class WaitBlockYAML(BlockYAML):
block_type: Literal[BlockType.WAIT] = BlockType.WAIT # type: ignore
wait_sec: int = 0
PARAMETER_YAML_SUBCLASSES = ( PARAMETER_YAML_SUBCLASSES = (
AWSSecretParameterYAML AWSSecretParameterYAML
| BitwardenLoginCredentialParameterYAML | BitwardenLoginCredentialParameterYAML
@@ -303,6 +308,7 @@ BLOCK_YAML_SUBCLASSES = (
| NavigationBlockYAML | NavigationBlockYAML
| ExtractionBlockYAML | ExtractionBlockYAML
| LoginBlockYAML | LoginBlockYAML
| WaitBlockYAML
) )
BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")] BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")]

View File

@@ -21,8 +21,10 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.db.enums import TaskPromptTemplate from skyvern.forge.sdk.db.enums import TaskPromptTemplate
from skyvern.forge.sdk.models import Organization, Step from skyvern.forge.sdk.models import Organization, Step
from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.exceptions import ( from skyvern.forge.sdk.workflow.exceptions import (
ContextParameterSourceNotDefined, ContextParameterSourceNotDefined,
InvalidWaitBlockTime,
InvalidWorkflowDefinition, InvalidWorkflowDefinition,
WorkflowDefinitionHasDuplicateParameterKeys, WorkflowDefinitionHasDuplicateParameterKeys,
WorkflowDefinitionHasReservedParameterKeys, WorkflowDefinitionHasReservedParameterKeys,
@@ -45,6 +47,7 @@ from skyvern.forge.sdk.workflow.models.block import (
TextPromptBlock, TextPromptBlock,
UploadToS3Block, UploadToS3Block,
ValidationBlock, ValidationBlock,
WaitBlock,
) )
from skyvern.forge.sdk.workflow.models.parameter import ( from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE, PARAMETER_TYPE,
@@ -1456,4 +1459,18 @@ class WorkflowService:
cache_actions=block_yaml.cache_actions, cache_actions=block_yaml.cache_actions,
) )
elif block_yaml.block_type == BlockType.WAIT:
if (
block_yaml.wait_sec <= 0
or block_yaml.wait_sec > SettingsManager.get_settings().WORKFLOW_WAIT_BLOCK_MAX_SEC
):
raise InvalidWaitBlockTime(SettingsManager.get_settings().WORKFLOW_WAIT_BLOCK_MAX_SEC)
return WaitBlock(
label=block_yaml.label,
wait_sec=block_yaml.wait_sec,
continue_on_failure=block_yaml.continue_on_failure,
output_parameter=output_parameter,
)
raise ValueError(f"Invalid block type {block_yaml.block_type}") raise ValueError(f"Invalid block type {block_yaml.block_type}")