From 6929a1d24d299dce2ec3189b7dc007e4b1081a77 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Fri, 5 Jul 2024 17:08:20 -0700 Subject: [PATCH] Implement FileURLParserBlock and FILE_URL WorkflowParameterType (#559) --- skyvern/forge/sdk/api/files.py | 8 ++ skyvern/forge/sdk/workflow/context_manager.py | 15 ++- skyvern/forge/sdk/workflow/exceptions.py | 8 ++ skyvern/forge/sdk/workflow/models/block.py | 101 +++++++++++++++--- .../forge/sdk/workflow/models/parameter.py | 3 + skyvern/forge/sdk/workflow/models/yaml.py | 10 +- skyvern/forge/sdk/workflow/service.py | 9 ++ 7 files changed, 135 insertions(+), 19 deletions(-) diff --git a/skyvern/forge/sdk/api/files.py b/skyvern/forge/sdk/api/files.py index 403ef49e..bbf3e6a3 100644 --- a/skyvern/forge/sdk/api/files.py +++ b/skyvern/forge/sdk/api/files.py @@ -9,10 +9,18 @@ import structlog from skyvern.constants import REPO_ROOT_DIR from skyvern.exceptions import DownloadFileMaxSizeExceeded +from skyvern.forge.sdk.api.aws import AsyncAWSClient LOG = structlog.get_logger() +async def download_from_s3(client: AsyncAWSClient, s3_uri: str) -> str: + downloaded_bytes = await client.download_file(uri=s3_uri) + file_path = tempfile.NamedTemporaryFile(delete=False) + file_path.write(downloaded_bytes) + return file_path.name + + async def download_file(url: str, max_size_mb: int | None = None) -> str: try: async with aiohttp.ClientSession(raise_for_status=True) as session: diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index e5ba6ce4..64dec72f 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -255,16 +255,21 @@ class WorkflowRunContext: old_value=parameter.value, new_value=value, ) - if not isinstance(value, dict): + if not isinstance(value, dict) and not isinstance(value, list): raise ValueError( - f"ContextParameter can't depend on an OutputParameter with a non-dict value. " + f"ContextParameter can only depend on an OutputParameter with a dict or list value. " f"ContextParameter key: {parameter.key}, " f"OutputParameter key: {output_parameter.key}, " f"OutputParameter value: {value}" ) - parameter.value = value.get(parameter.key) - self.parameters[parameter.key] = parameter - self.values[parameter.key] = parameter.value + if isinstance(value, dict): + parameter.value = value.get(parameter.key) + self.parameters[parameter.key] = parameter + self.values[parameter.key] = parameter.value + else: + parameter.value = value + self.parameters[parameter.key] = parameter + self.values[parameter.key] = parameter.value async def register_block_parameters( self, diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index 37eec528..f234dab6 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -75,3 +75,11 @@ class ContextParameterSourceNotDefined(BaseWorkflowHTTPException): f"Source parameter key {source_key} for context parameter {context_parameter_key} does not exist.", status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, ) + + +class InvalidFileType(BaseWorkflowHTTPException): + def __init__(self, file_url: str, file_type: str, error: str) -> None: + super().__init__( + f"File URL {file_url} is not a valid {file_type} file. Error: {error}", + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e00a03f2..3836cca5 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -1,4 +1,5 @@ import abc +import csv import json import os import smtplib @@ -25,12 +26,16 @@ from skyvern.exceptions import ( from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import AsyncAWSClient -from skyvern.forge.sdk.api.files import download_file, get_path_for_workflow_download_directory +from skyvern.forge.sdk.api.files import download_file, download_from_s3, get_path_for_workflow_download_directory from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext -from skyvern.forge.sdk.workflow.exceptions import InvalidEmailClientConfiguration, NoValidEmailRecipient +from skyvern.forge.sdk.workflow.exceptions import ( + InvalidEmailClientConfiguration, + InvalidFileType, + NoValidEmailRecipient, +) from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, AWSSecretParameter, @@ -50,6 +55,7 @@ class BlockType(StrEnum): DOWNLOAD_TO_S3 = "download_to_s3" UPLOAD_TO_S3 = "upload_to_s3" SEND_EMAIL = "send_email" + FILE_URL_PARSER = "file_url_parser" @dataclass(frozen=True) @@ -353,9 +359,8 @@ class ForLoopBlock(Block): return list(parameters) def get_loop_block_context_parameters(self, workflow_run_id: str, loop_data: Any) -> list[ContextParameter]: - if not isinstance(loop_data, dict): - # TODO (kerem): Should we add support for other types? - raise ValueError("loop_data should be a dict") + if not isinstance(loop_data, dict) and not isinstance(loop_data, list): + raise ValueError("loop_data should be a dict or a list.") context_parameters = [] for loop_block in self.loop_blocks: @@ -369,13 +374,19 @@ class ForLoopBlock(Block): for context_parameter in context_parameters: if context_parameter.source.key != self.loop_over.key: continue - if context_parameter.key not in loop_data: - raise ContextParameterValueNotFound( - parameter_key=context_parameter.key, - existing_keys=list(loop_data.keys()), - workflow_run_id=workflow_run_id, - ) - context_parameter.value = loop_data[context_parameter.key] + # If the loop_data is a dict, we need to check if the key exists in the loop_data + if isinstance(loop_data, dict): + if context_parameter.key in loop_data: + context_parameter.value = loop_data[context_parameter.key] + else: + raise ContextParameterValueNotFound( + parameter_key=context_parameter.key, + existing_keys=list(loop_data.keys()), + workflow_run_id=workflow_run_id, + ) + else: + # If the loop_data is a list, we can directly assign the loop_data to the context_parameter value + context_parameter.value = loop_data return context_parameters @@ -859,7 +870,7 @@ class SendEmailBlock(Block): path = None try: if filename.startswith("s3://"): - path = await self._download_from_s3(filename) + path = await download_from_s3(self.get_async_aws_client(), filename) elif filename.startswith("http://") or filename.startswith("https://"): path = await download_file(filename) else: @@ -947,6 +958,69 @@ class SendEmailBlock(Block): return self.build_block_result(success=True, output_parameter_value=result_dict) +class FileType(StrEnum): + CSV = "csv" + + +class FileParserBlock(Block): + block_type: Literal[BlockType.FILE_URL_PARSER] = BlockType.FILE_URL_PARSER + + file_url: str + file_type: FileType + + def get_all_parameters( + self, + workflow_run_id: str, + ) -> list[PARAMETER_TYPE]: + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + if self.file_url and workflow_run_context.has_parameter(self.file_url): + return [workflow_run_context.get_parameter(self.file_url)] + return [] + + def validate_file_type(self, file_url_used: str, file_path: str) -> None: + if self.file_type == FileType.CSV: + try: + with open(file_path, "r") as file: + csv.Sniffer().sniff(file.read(1024)) + except csv.Error as e: + raise InvalidFileType(file_url=file_url_used, file_type=self.file_type, error=str(e)) + + async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: + workflow_run_context = self.get_workflow_run_context(workflow_run_id) + file_url_to_use = self.file_url + if ( + self.file_url + and workflow_run_context.has_parameter(self.file_url) + and workflow_run_context.has_value(self.file_url) + ): + file_url_parameter_value = workflow_run_context.get_value(self.file_url) + if file_url_parameter_value: + LOG.info( + "FileParserBlock: File URL is parameterized, using parameter value", + file_url_parameter_value=file_url_parameter_value, + file_url_parameter_key=self.file_url, + ) + file_url_to_use = file_url_parameter_value + + # Download the file + if file_url_to_use.startswith("s3://"): + file_path = await download_from_s3(self.get_async_aws_client(), file_url_to_use) + else: + file_path = await download_file(file_url_to_use) + # Validate the file type + self.validate_file_type(file_url_to_use, file_path) + # Parse the file into a list of dictionaries where each dictionary represents a row in the file + parsed_data = [] + with open(file_path, "r") as file: + if self.file_type == FileType.CSV: + reader = csv.DictReader(file) + for row in reader: + parsed_data.append(row) + # Record the parsed data + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data) + return self.build_block_result(success=True, output_parameter_value=parsed_data) + + BlockSubclasses = Union[ ForLoopBlock, TaskBlock, @@ -955,5 +1029,6 @@ BlockSubclasses = Union[ DownloadToS3Block, UploadToS3Block, SendEmailBlock, + FileParserBlock, ] BlockTypeVar = Annotated[BlockSubclasses, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/models/parameter.py b/skyvern/forge/sdk/workflow/models/parameter.py index b0f3fe07..5bd75da7 100644 --- a/skyvern/forge/sdk/workflow/models/parameter.py +++ b/skyvern/forge/sdk/workflow/models/parameter.py @@ -67,6 +67,7 @@ class WorkflowParameterType(StrEnum): FLOAT = "float" BOOLEAN = "boolean" JSON = "json" + FILE_URL = "file_url" def convert_value(self, value: str | None) -> str | int | float | bool | dict | list | None: if value is None: @@ -81,6 +82,8 @@ class WorkflowParameterType(StrEnum): return value.lower() in ["true", "1"] elif self == WorkflowParameterType.JSON: return json.loads(value) + elif self == WorkflowParameterType.FILE_URL: + return value class WorkflowParameter(Parameter): diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index b1769485..c5c83ef2 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -4,7 +4,7 @@ from typing import Annotated, Any, Literal from pydantic import BaseModel, Field from skyvern.forge.sdk.schemas.tasks import ProxyLocation -from skyvern.forge.sdk.workflow.models.block import BlockType +from skyvern.forge.sdk.workflow.models.block import BlockType, FileType from skyvern.forge.sdk.workflow.models.parameter import ParameterType, WorkflowParameterType @@ -162,6 +162,13 @@ class SendEmailBlockYAML(BlockYAML): file_attachments: list[str] | None = None +class FileParserBlockYAML(BlockYAML): + block_type: Literal[BlockType.FILE_URL_PARSER] = BlockType.FILE_URL_PARSER # type: ignore + + file_url: str + file_type: FileType + + PARAMETER_YAML_SUBCLASSES = ( AWSSecretParameterYAML | BitwardenLoginCredentialParameterYAML @@ -179,6 +186,7 @@ BLOCK_YAML_SUBCLASSES = ( | DownloadToS3BlockYAML | UploadToS3BlockYAML | SendEmailBlockYAML + | FileParserBlockYAML ) BLOCK_YAML_TYPES = Annotated[BLOCK_YAML_SUBCLASSES, Field(discriminator="block_type")] diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index e6f0a01c..1f267858 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -25,6 +25,7 @@ from skyvern.forge.sdk.workflow.models.block import ( BlockTypeVar, CodeBlock, DownloadToS3Block, + FileParserBlock, ForLoopBlock, SendEmailBlock, TaskBlock, @@ -1052,4 +1053,12 @@ class WorkflowService: file_attachments=block_yaml.file_attachments or [], continue_on_failure=block_yaml.continue_on_failure, ) + elif block_yaml.block_type == BlockType.FILE_URL_PARSER: + return FileParserBlock( + label=block_yaml.label, + output_parameter=output_parameter, + file_url=block_yaml.file_url, + file_type=block_yaml.file_type, + continue_on_failure=block_yaml.continue_on_failure, + ) raise ValueError(f"Invalid block type {block_yaml.block_type}")