diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index 4366f309..91c8f77f 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -1,33 +1,61 @@ -from skyvern.exceptions import SkyvernException +from starlette import status + +from skyvern.exceptions import SkyvernException, SkyvernHTTPException class BaseWorkflowException(SkyvernException): pass -class WorkflowDefinitionHasDuplicateBlockLabels(BaseWorkflowException): +class BaseWorkflowHTTPException(SkyvernHTTPException): + pass + + +class WorkflowDefinitionHasDuplicateBlockLabels(BaseWorkflowHTTPException): def __init__(self, duplicate_labels: set[str]) -> None: super().__init__( f"WorkflowDefinition has blocks with duplicate labels. Each block needs to have a unique " - f"label. Duplicate label(s): {','.join(duplicate_labels)}" + f"label. Duplicate label(s): {','.join(duplicate_labels)}", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, ) -class OutputParameterKeyCollisionError(BaseWorkflowException): +class OutputParameterKeyCollisionError(BaseWorkflowHTTPException): def __init__(self, key: str, retry_count: int | None = None) -> None: message = f"Output parameter key {key} already exists in the context manager." if retry_count is not None: message += f" Retrying {retry_count} more times." elif retry_count == 0: message += " Max duplicate retries reached, aborting." - super().__init__(message) + super().__init__( + message, + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ) -class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowException): +class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowHTTPException): def __init__(self, duplicate_keys: set[str]) -> None: super().__init__( f"WorkflowDefinition has parameters with duplicate keys. Each parameter needs to have a unique " - f"key. Duplicate key(s): {','.join(duplicate_keys)}" + f"key. Duplicate key(s): {','.join(duplicate_keys)}", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ) + + +class WorkflowDefinitionHasReservedParameterKeys(BaseWorkflowHTTPException): + def __init__(self, reserved_keys: list[str], parameter_keys: list[str]) -> None: + super().__init__( + f"WorkflowDefinition has parameters with reserved keys. User created parameters cannot have the following " + f"reserved keys: {','.join(reserved_keys)}. Parameter keys: {','.join(parameter_keys)}", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ) + + +class InvalidWorkflowDefinition(BaseWorkflowHTTPException): + def __init__(self, message: str) -> None: + super().__init__( + message, + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, ) @@ -36,8 +64,9 @@ class InvalidEmailClientConfiguration(BaseWorkflowException): super().__init__(f"Email client configuration is invalid. These parameters are missing or invalid: {problems}") -class ContextParameterSourceNotDefined(BaseWorkflowException): +class ContextParameterSourceNotDefined(BaseWorkflowHTTPException): def __init__(self, context_parameter_key: str, source_key: str) -> None: super().__init__( - f"Source parameter key {source_key} for context parameter {context_parameter_key} does not exist." + f"Source parameter key {source_key} for context parameter {context_parameter_key} does not exist.", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index d42fc529..a80558a6 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -54,7 +54,7 @@ class BlockType(StrEnum): @dataclass(frozen=True) class BlockResult: success: bool - output_parameter: OutputParameter | None = None + output_parameter: OutputParameter output_parameter_value: dict[str, Any] | list | str | None = None @@ -62,9 +62,37 @@ class Block(BaseModel, abc.ABC): # Must be unique within workflow definition label: str block_type: BlockType - output_parameter: OutputParameter | None = None + output_parameter: OutputParameter continue_on_failure: bool = False + async def record_output_parameter_value( + self, + workflow_run_context: WorkflowRunContext, + workflow_run_id: str, + value: dict[str, Any] | list | str | None = None, + ) -> None: + await workflow_run_context.register_output_parameter_value_post_execution( + parameter=self.output_parameter, + value=value, + ) + await app.DATABASE.create_workflow_run_output_parameter( + workflow_run_id=workflow_run_id, + output_parameter_id=self.output_parameter.output_parameter_id, + value=value, + ) + LOG.info( + f"Registered output parameter value", + output_parameter_id=self.output_parameter.output_parameter_id, + workflow_run_id=workflow_run_id, + ) + + def build_block_result( + self, success: bool, output_parameter_value: dict[str, Any] | list | str | None = None + ) -> BlockResult: + return BlockResult( + success=success, output_parameter=self.output_parameter, output_parameter_value=output_parameter_value + ) + @classmethod def get_subclasses(cls) -> tuple[type["Block"], ...]: return tuple(cls.__subclasses__()) @@ -91,7 +119,11 @@ class Block(BaseModel, abc.ABC): block_label=self.label, block_type=self.block_type, ) - return BlockResult(success=False) + # Record output parameter value if it hasn't been recorded yet + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + if not workflow_run_context.has_value(self.output_parameter.key): + await self.record_output_parameter_value(workflow_run_context, workflow_run_id) + return self.build_block_result(success=False) @abc.abstractmethod def get_all_parameters( @@ -238,6 +270,7 @@ class TaskBlock(Block): raise TaskNotFound(task.task_id) if not updated_task.status.is_final(): raise UnexpectedTaskStatus(task_id=updated_task.task_id, status=updated_task.status) + if updated_task.status == TaskStatus.completed or updated_task.status == TaskStatus.terminated: LOG.info( f"Task completed", @@ -249,30 +282,9 @@ class TaskBlock(Block): ) success = updated_task.status == TaskStatus.completed task_output = TaskOutput.from_task(updated_task) - if self.output_parameter: - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=task_output.model_dump(), - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=task_output.model_dump(), - ) - LOG.info( - f"Registered output parameter value", - output_parameter_id=self.output_parameter.output_parameter_id, - value=updated_task.extracted_information, - workflow_run_id=workflow_run_id, - workflow_id=workflow.workflow_id, - task_id=updated_task.task_id, - ) - return BlockResult( - success=success, - output_parameter=self.output_parameter, - output_parameter_value=task_output.model_dump(), - ) - return BlockResult(success=success) + output_parameter_value = task_output.model_dump() + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value) + return self.build_block_result(success=success, output_parameter_value=output_parameter_value) else: current_retry += 1 will_retry = current_retry <= self.max_retries @@ -289,14 +301,20 @@ class TaskBlock(Block): max_retries=self.max_retries, task_output=task_output.model_dump_json(), ) + if not will_retry: + output_parameter_value = task_output.model_dump() + await self.record_output_parameter_value( + workflow_run_context, workflow_run_id, output_parameter_value + ) + return self.build_block_result(success=False, output_parameter_value=output_parameter_value) - return BlockResult(success=False) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id) + return self.build_block_result(success=False) class ForLoopBlock(Block): block_type: Literal[BlockType.FOR_LOOP] = BlockType.FOR_LOOP - # TODO (kerem): Add support for ContextParameter loop_over: PARAMETER_TYPE loop_blocks: list["BlockTypeVar"] @@ -370,6 +388,7 @@ class ForLoopBlock(Block): return [parameter_value] async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: + outputs_with_loop_values = [] success = False workflow_run_context = self.get_workflow_run_context(workflow_run_id) loop_over_values = self.get_loop_over_parameter_values(workflow_run_context) @@ -386,30 +405,27 @@ class ForLoopBlock(Block): workflow_run_id=workflow_run_id, num_loop_over_values=len(loop_over_values), ) - return BlockResult(success=success) - outputs_with_loop_values = [] + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, []) + return self.build_block_result(success=False) for loop_idx, loop_over_value in enumerate(loop_over_values): context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value) for context_parameter in context_parameters_with_value: workflow_run_context.set_value(context_parameter.key, context_parameter.value) - try: - block_outputs = [] - for block_idx, loop_block in enumerate(self.loop_blocks): - block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id) - block_outputs.append(block_output) - if not block_output.success and not loop_block.continue_on_failure: - LOG.info( - f"ForLoopBlock: Encountered an failure processing block {block_idx} during loop {loop_idx}, terminating early", - block_outputs=block_outputs, - loop_idx=loop_idx, - block_idx=block_idx, - loop_over_value=loop_over_value, - loop_block_continue_on_failure=loop_block.continue_on_failure, - ) - break - except Exception as e: - LOG.error("ForLoopBlock: Failed to execute loop block", exc_info=True) - raise e + block_outputs = [] + for block_idx, loop_block in enumerate(self.loop_blocks): + block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id) + block_outputs.append(block_output) + if not block_output.success and not loop_block.continue_on_failure: + LOG.info( + f"ForLoopBlock: Encountered an failure processing block {block_idx} during loop {loop_idx}, terminating early", + block_outputs=block_outputs, + loop_idx=loop_idx, + block_idx=block_idx, + loop_over_value=loop_over_value, + loop_block_continue_on_failure=loop_block.continue_on_failure, + ) + break + outputs_with_loop_values.append( [ { @@ -427,27 +443,13 @@ class ForLoopBlock(Block): success = all([block_output.success for block_output in block_outputs]) if not success and not self.continue_on_failure: LOG.info( - "ForLoopBlock: Encountered an failure processing block, terminating early", - block_outputs=block_outputs, - continue_on_failure=self.continue_on_failure, + f"ForLoopBlock: Encountered an failure processing loop {loop_idx}, won't continue to the next loop. Total number of loops: {len(loop_over_values)}", + for_loop_continue_on_failure=self.continue_on_failure, ) break - if self.output_parameter: - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=outputs_with_loop_values, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=outputs_with_loop_values, - ) - return BlockResult( - success=success, output_parameter=self.output_parameter, output_parameter_value=outputs_with_loop_values - ) - - return BlockResult(success=success) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, outputs_with_loop_values) + return self.build_block_result(success=success, output_parameter_value=outputs_with_loop_values) class CodeBlock(Block): @@ -478,19 +480,8 @@ class CodeBlock(Block): local_variables: dict[str, Any] = {} exec(self.code, parameter_values, local_variables) result = {"result": local_variables.get("result")} - if self.output_parameter: - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=result, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=result, - ) - return BlockResult(success=True, output_parameter=self.output_parameter, output_parameter_value=result) - - return BlockResult(success=True) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result) + return self.build_block_result(success=True, output_parameter_value=result) class TextPromptBlock(Block): @@ -547,19 +538,8 @@ class TextPromptBlock(Block): parameter_values[parameter.key] = value response = await self.send_prompt(self.prompt, parameter_values) - if self.output_parameter: - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=response, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=response, - ) - return BlockResult(success=True, output_parameter=self.output_parameter, output_parameter_value=response) - - return BlockResult(success=True) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, response) + return self.build_block_result(success=True, output_parameter_value=response) class DownloadToS3Block(Block): @@ -615,21 +595,8 @@ class DownloadToS3Block(Block): raise e LOG.info("DownloadToS3Block: File downloaded and uploaded to S3", uri=uri) - if self.output_parameter: - LOG.info("DownloadToS3Block: Output parameter defined, registering output parameter value") - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=uri, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=uri, - ) - return BlockResult(success=True, output_parameter=self.output_parameter, output_parameter_value=uri) - - LOG.info("DownloadToS3Block: No output parameter defined, returning None") - return BlockResult(success=True) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, uri) + return self.build_block_result(success=True, output_parameter_value=uri) class UploadToS3Block(Block): @@ -675,6 +642,7 @@ class UploadToS3Block(Block): if not self.path or not os.path.exists(self.path): raise FileNotFoundError(f"UploadToS3Block: File not found at path: {self.path}") + s3_uris = [] try: client = self.get_async_aws_client() # is the file path a file or a directory? @@ -689,19 +657,20 @@ class UploadToS3Block(Block): LOG.warning("UploadToS3Block: Skipping directory", file=file) continue file_path = os.path.join(self.path, file) - await client.upload_file_from_path( - uri=self._get_s3_uri(workflow_run_id, file_path), file_path=file_path - ) + s3_uri = self._get_s3_uri(workflow_run_id, file_path) + s3_uris.append(s3_uri) + await client.upload_file_from_path(uri=s3_uri, file_path=file_path) else: - await client.upload_file_from_path( - uri=self._get_s3_uri(workflow_run_id, self.path), file_path=self.path - ) + s3_uri = self._get_s3_uri(workflow_run_id, self.path) + s3_uris.append(s3_uri) + await client.upload_file_from_path(uri=s3_uri, file_path=self.path) except Exception as e: LOG.exception("UploadToS3Block: Failed to upload file to S3", file_path=self.path) raise e LOG.info("UploadToS3Block: File(s) uploaded to S3", file_path=self.path) - return BlockResult(success=True) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, s3_uris) + return self.build_block_result(success=True, output_parameter_value=s3_uris) class SendEmailBlock(Block): @@ -902,39 +871,16 @@ class SendEmailBlock(Block): LOG.info("SendEmailBlock: Email sent") except Exception as e: LOG.error("SendEmailBlock: Failed to send email", exc_info=True) - if self.output_parameter: - result_dict = {"success": False, "error": str(e)} - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=result_dict, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=result_dict, - ) - return BlockResult( - success=False, output_parameter=self.output_parameter, output_parameter_value=result_dict - ) - raise e + result_dict = {"success": False, "error": str(e)} + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) + return self.build_block_result(success=False, output_parameter_value=result_dict) finally: if smtp_host: smtp_host.quit() result_dict = {"success": True} - if self.output_parameter: - await workflow_run_context.register_output_parameter_value_post_execution( - parameter=self.output_parameter, - value=result_dict, - ) - await app.DATABASE.create_workflow_run_output_parameter( - workflow_run_id=workflow_run_id, - output_parameter_id=self.output_parameter.output_parameter_id, - value=result_dict, - ) - return BlockResult(success=True, output_parameter=self.output_parameter, output_parameter_value=result_dict) - - return BlockResult(success=True) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) + return self.build_block_result(success=True, output_parameter_value=result_dict) BlockSubclasses = Union[ diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index ad5d0b52..3c078b4d 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -22,7 +22,9 @@ from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import ProxyLocation, Task, TaskStatus from skyvern.forge.sdk.workflow.exceptions import ( ContextParameterSourceNotDefined, + InvalidWorkflowDefinition, WorkflowDefinitionHasDuplicateParameterKeys, + WorkflowDefinitionHasReservedParameterKeys, ) from skyvern.forge.sdk.workflow.models.block import ( BlockResult, @@ -56,7 +58,7 @@ from skyvern.forge.sdk.workflow.models.workflow import ( WorkflowRunStatus, WorkflowRunStatusResponse, ) -from skyvern.forge.sdk.workflow.models.yaml import BLOCK_YAML_TYPES, WorkflowCreateYAMLRequest +from skyvern.forge.sdk.workflow.models.yaml import BLOCK_YAML_TYPES, ForLoopBlockYAML, WorkflowCreateYAMLRequest from skyvern.webeye.browser_factory import BrowserState LOG = structlog.get_logger() @@ -760,6 +762,29 @@ class WorkflowService: parameters: dict[str, PARAMETER_TYPE] = {} duplicate_parameter_keys = set() + # Check if user's trying to manually create an output parameter + if any( + parameter.parameter_type == ParameterType.OUTPUT for parameter in request.workflow_definition.parameters + ): + raise InvalidWorkflowDefinition(message="Cannot manually create output parameters") + + # Check if any parameter keys collide with automatically created output parameter keys + block_labels = [block.label for block in request.workflow_definition.blocks] + # TODO (kerem): Check if block labels are unique + output_parameter_keys = [f"{block_label}_output" for block_label in block_labels] + parameter_keys = [parameter.key for parameter in request.workflow_definition.parameters] + if any(key in output_parameter_keys for key in parameter_keys): + raise WorkflowDefinitionHasReservedParameterKeys( + reserved_keys=output_parameter_keys, parameter_keys=parameter_keys + ) + + # Create output parameters for all blocks + block_output_parameters = await WorkflowService._create_all_output_parameters_for_workflow( + workflow_id=workflow.workflow_id, block_yamls=request.workflow_definition.blocks + ) + for block_output_parameter in block_output_parameters.values(): + parameters[block_output_parameter.key] = block_output_parameter + # We're going to process context parameters after other parameters since they depend on the other parameters context_parameter_yamls = [] @@ -833,7 +858,7 @@ class WorkflowService: block_label_mapping = {} blocks = [] for block_yaml in request.workflow_definition.blocks: - block = await self.block_yaml_to_block(block_yaml, parameters) + block = await self.block_yaml_to_block(workflow, block_yaml, parameters) blocks.append(block) block_label_mapping[block.label] = block @@ -858,8 +883,38 @@ class WorkflowService: raise e @staticmethod - async def block_yaml_to_block(block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter]) -> BlockTypeVar: - output_parameter = parameters.get(block_yaml.output_parameter_key) if block_yaml.output_parameter_key else None + async def _create_output_parameter_for_block(workflow_id: str, block_yaml: BLOCK_YAML_TYPES) -> OutputParameter: + output_parameter_key = f"{block_yaml.label}_output" + return await app.DATABASE.create_output_parameter( + workflow_id=workflow_id, + key=output_parameter_key, + description=f"Output parameter for block {block_yaml.label}", + ) + + @staticmethod + async def _create_all_output_parameters_for_workflow( + workflow_id: str, block_yamls: list[BLOCK_YAML_TYPES] + ) -> dict[str, OutputParameter]: + output_parameters = {} + for block_yaml in block_yamls: + output_parameter = await WorkflowService._create_output_parameter_for_block( + workflow_id=workflow_id, block_yaml=block_yaml + ) + output_parameters[block_yaml.label] = output_parameter + # Recursively create output parameters for for loop blocks + if isinstance(block_yaml, ForLoopBlockYAML): + output_parameters.update( + await WorkflowService._create_all_output_parameters_for_workflow( + workflow_id=workflow_id, block_yamls=block_yaml.loop_blocks + ) + ) + return output_parameters + + @staticmethod + async def block_yaml_to_block( + workflow: Workflow, block_yaml: BLOCK_YAML_TYPES, parameters: dict[str, Parameter] + ) -> BlockTypeVar: + output_parameter = parameters[f"{block_yaml.label}_output"] if block_yaml.block_type == BlockType.TASK: task_block_parameters = ( [parameters[parameter_key] for parameter_key in block_yaml.parameter_keys] @@ -880,7 +935,7 @@ class WorkflowService: ) elif block_yaml.block_type == BlockType.FOR_LOOP: loop_blocks = [ - await WorkflowService.block_yaml_to_block(loop_block, parameters) + await WorkflowService.block_yaml_to_block(workflow, loop_block, parameters) for loop_block in block_yaml.loop_blocks ] loop_over_parameter = parameters[block_yaml.loop_over_parameter_key]