diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index 01b65d3a..32eb498e 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -9,6 +9,8 @@ from skyvern.forge.sdk.services.bitwarden import BitwardenConstants, BitwardenSe from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError from skyvern.forge.sdk.workflow.models.parameter import ( PARAMETER_TYPE, + BitwardenLoginCredentialParameter, + ContextParameter, OutputParameter, Parameter, ParameterType, @@ -30,6 +32,7 @@ class WorkflowRunContext: self, workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]], workflow_output_parameters: list[OutputParameter], + context_parameters: list[ContextParameter], ) -> None: self.parameters = {} self.values = {} @@ -50,6 +53,12 @@ class WorkflowRunContext: raise OutputParameterKeyCollisionError(output_parameter.key) self.parameters[output_parameter.key] = output_parameter + for context_parameter in context_parameters: + # All context parameters will be registered with the context manager during initialization but the values + # will be calculated and set before and after each block execution + # values sometimes will be overwritten by the block execution itself + self.parameters[context_parameter.key] = context_parameter + def get_parameter(self, key: str) -> Parameter: return self.parameters[key] @@ -175,9 +184,32 @@ class WorkflowRunContext: BitwardenService.logout() LOG.error(f"Failed to get secret from Bitwarden. Error: {e}") raise e - elif parameter.parameter_type == ParameterType.CONTEXT: - # ContextParameter values will be set within the blocks - return + elif isinstance(parameter, ContextParameter): + if isinstance(parameter.source, WorkflowParameter): + # TODO (kerem): set this while initializing the context manager + workflow_parameter_value = self.get_value(parameter.source.key) + if not isinstance(workflow_parameter_value, dict): + raise ValueError(f"ContextParameter source value is not a dict. Parameter key: {parameter.key}") + parameter.value = workflow_parameter_value.get(parameter.source.key) + self.parameters[parameter.key] = parameter + self.values[parameter.key] = parameter.value + elif isinstance(parameter.source, ContextParameter): + # TODO (kerem): update this anytime the source parameter value changes in values dict + context_parameter_value = self.get_value(parameter.source.key) + if not isinstance(context_parameter_value, dict): + raise ValueError(f"ContextParameter source value is not a dict. Parameter key: {parameter.key}") + parameter.value = context_parameter_value.get(parameter.source.key) + self.parameters[parameter.key] = parameter + self.values[parameter.key] = parameter.value + elif isinstance(parameter.source, OutputParameter): + # We won't set the value of the ContextParameter if the source is an OutputParameter it'll be set in + # `register_output_parameter_value_post_execution` method + pass + else: + raise NotImplementedError( + f"ContextParameter source has to be a WorkflowParameter, ContextParameter, or OutputParameter. " + f"{parameter.source.parameter_type} is not supported." + ) else: raise ValueError(f"Unknown parameter type: {parameter.parameter_type}") @@ -189,28 +221,66 @@ class WorkflowRunContext: return self.values[parameter.key] = value + await self.set_parameter_values_for_output_parameter_dependent_blocks(parameter, value) + + async def set_parameter_values_for_output_parameter_dependent_blocks( + self, output_parameter: OutputParameter, value: dict[str, Any] | list | str | None + ) -> None: + for key, parameter in self.parameters.items(): + if ( + isinstance(parameter, ContextParameter) + and isinstance(parameter.source, OutputParameter) + and parameter.source.key == output_parameter.key + ): + if parameter.value: + LOG.warning( + f"Context parameter {parameter.key} already has a value, overwriting", + old_value=parameter.value, + new_value=value, + ) + if not isinstance(value, dict): + raise ValueError( + f"ContextParameter can't depend on an OutputParameter with a non-dict value. " + f"ContextParameter key: {parameter.key}, " + f"OutputParameter key: {output_parameter.key}, " + f"OutputParameter value: {value}" + ) + parameter.value = value.get(parameter.key) + self.parameters[parameter.key] = parameter + self.values[parameter.key] = parameter.value async def register_block_parameters( self, aws_client: AsyncAWSClient, parameters: list[PARAMETER_TYPE], ) -> None: + # Sort the parameters so that ContextParameter and BitwardenLoginCredentialParameter are processed last + # ContextParameter should be processed at the end since it requires the source parameter to be set # BitwardenLoginCredentialParameter should be processed last since it requires the URL parameter to be set - parameters.sort(key=lambda x: x.parameter_type != ParameterType.BITWARDEN_LOGIN_CREDENTIAL) + # Python's tuple comparison works lexicographically, so we can sort the parameters by their type in a tuple + parameters.sort( + key=lambda x: ( + isinstance(x, ContextParameter), + # This makes sure that ContextParameters witha ContextParameter source are processed after all other + # ContextParameters + isinstance(x.source, ContextParameter) if isinstance(x, ContextParameter) else False, + isinstance(x, BitwardenLoginCredentialParameter), + ) + ) for parameter in parameters: if parameter.key in self.parameters: LOG.debug(f"Parameter {parameter.key} already registered, skipping") continue - if parameter.parameter_type == ParameterType.WORKFLOW: + if isinstance(parameter, WorkflowParameter): LOG.error( f"Workflow parameter {parameter.key} should have already been set through workflow run parameters" ) raise ValueError( f"Workflow parameter {parameter.key} should have already been set through workflow run parameters" ) - elif parameter.parameter_type == ParameterType.OUTPUT: + elif isinstance(parameter, OutputParameter): LOG.error( f"Output parameter {parameter.key} should have already been set through workflow run context init" ) @@ -244,8 +314,11 @@ class WorkflowContextManager: workflow_run_id: str, workflow_parameter_tuples: list[tuple[WorkflowParameter, "WorkflowRunParameter"]], workflow_output_parameters: list[OutputParameter], + context_parameters: list[ContextParameter], ) -> WorkflowRunContext: - workflow_run_context = WorkflowRunContext(workflow_parameter_tuples, workflow_output_parameters) + workflow_run_context = WorkflowRunContext( + workflow_parameter_tuples, workflow_output_parameters, context_parameters + ) self.workflow_run_contexts[workflow_run_id] = workflow_run_context return workflow_run_context diff --git a/skyvern/forge/sdk/workflow/exceptions.py b/skyvern/forge/sdk/workflow/exceptions.py index 796f5c84..4366f309 100644 --- a/skyvern/forge/sdk/workflow/exceptions.py +++ b/skyvern/forge/sdk/workflow/exceptions.py @@ -34,3 +34,10 @@ class WorkflowDefinitionHasDuplicateParameterKeys(BaseWorkflowException): class InvalidEmailClientConfiguration(BaseWorkflowException): def __init__(self, problems: list[str]) -> None: super().__init__(f"Email client configuration is invalid. These parameters are missing or invalid: {problems}") + + +class ContextParameterSourceNotDefined(BaseWorkflowException): + def __init__(self, context_parameter_key: str, source_key: str) -> None: + super().__init__( + f"Source parameter key {source_key} for context parameter {context_parameter_key} does not exist." + ) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e17e6950..f25220e3 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -122,7 +122,17 @@ class TaskBlock(Block): workflow_run_context = self.get_workflow_run_context(workflow_run_id) if self.url and workflow_run_context.has_parameter(self.url): - parameters.append(workflow_run_context.get_parameter(self.url)) + if workflow_run_context.has_value(self.url): + LOG.info( + "Task URL is parameterized, using parameter value", + task_url_parameter_value=workflow_run_context.get_value(self.url), + task_url_parameter_key=self.url, + ) + self.url = workflow_run_context.get_value(self.url) + else: + # if the parameter is not resolved yet, we'll add it to the list of parameters to resolve + # parameterization of the url would happen when the task is executed + parameters.append(workflow_run_context.get_parameter(self.url)) return parameters @@ -300,11 +310,18 @@ class ForLoopBlock(Block): # TODO (kerem): Should we add support for other types? raise ValueError("loop_data should be a dict") - loop_block_parameters = self.get_all_parameters(workflow_run_id) - context_parameters = [ - parameter for parameter in loop_block_parameters if isinstance(parameter, ContextParameter) - ] + context_parameters = [] + for loop_block in self.loop_blocks: + # todo: handle the case where the loop_block is a ForLoopBlock + + all_parameters = loop_block.get_all_parameters(workflow_run_id) + for parameter in all_parameters: + if isinstance(parameter, ContextParameter): + context_parameters.append(parameter) + for context_parameter in context_parameters: + if context_parameter.source.key != self.loop_over.key: + continue if context_parameter.key not in loop_data: raise ContextParameterValueNotFound( parameter_key=context_parameter.key, @@ -318,15 +335,23 @@ class ForLoopBlock(Block): def get_loop_over_parameter_values(self, workflow_run_context: WorkflowRunContext) -> list[Any]: if isinstance(self.loop_over, WorkflowParameter) or isinstance(self.loop_over, OutputParameter): parameter_value = workflow_run_context.get_value(self.loop_over.key) - if isinstance(parameter_value, list): - return parameter_value - else: - # TODO (kerem): Should we raise an error here? - return [parameter_value] + elif isinstance(self.loop_over, ContextParameter): + parameter_value = self.loop_over.value + if not parameter_value: + source_parameter_value = workflow_run_context.get_value(self.loop_over.source.key) + if isinstance(source_parameter_value, dict): + parameter_value = source_parameter_value.get(self.loop_over.key) + else: + raise ValueError("ContextParameter source value should be a dict") else: - # TODO (kerem): Implement this for context parameters raise NotImplementedError + if isinstance(parameter_value, list): + return parameter_value + else: + # TODO (kerem): Should we raise an error here? + return [parameter_value] + async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: workflow_run_context = self.get_workflow_run_context(workflow_run_id) loop_over_values = self.get_loop_over_parameter_values(workflow_run_context) diff --git a/skyvern/forge/sdk/workflow/models/parameter.py b/skyvern/forge/sdk/workflow/models/parameter.py index 0d4ce73d..adc71e2b 100644 --- a/skyvern/forge/sdk/workflow/models/parameter.py +++ b/skyvern/forge/sdk/workflow/models/parameter.py @@ -97,7 +97,7 @@ class WorkflowParameter(Parameter): class ContextParameter(Parameter): parameter_type: Literal[ParameterType.CONTEXT] = ParameterType.CONTEXT - source: WorkflowParameter + source: "ParameterSubclasses" # value will be populated by the context manager value: str | int | float | bool | dict | list | None = None diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index b9b36625..bba4c09f 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -7,6 +7,7 @@ from pydantic import BaseModel from skyvern.forge.sdk.schemas.tasks import ProxyLocation from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateBlockLabels from skyvern.forge.sdk.workflow.models.block import BlockResult, BlockTypeVar +from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE class WorkflowRequestBody(BaseModel): @@ -21,6 +22,7 @@ class RunWorkflowResponse(BaseModel): class WorkflowDefinition(BaseModel): + parameters: list[PARAMETER_TYPE] blocks: List[BlockTypeVar] def validate(self) -> None: diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index 778361ab..c65e6dac 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -53,7 +53,7 @@ class ContextParameterYAML(ParameterYAML): # This pattern already works in block.py but since the ParameterType is not defined in this file, mypy is not able # to infer the type of the parameter_type attribute. parameter_type: Literal[ParameterType.CONTEXT] = ParameterType.CONTEXT # type: ignore - source_workflow_parameter_key: str + source_parameter_key: str class OutputParameterYAML(ParameterYAML): diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 49b1d709..746e0b09 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -20,7 +20,10 @@ from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus -from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateParameterKeys +from skyvern.forge.sdk.workflow.exceptions import ( + ContextParameterSourceNotDefined, + WorkflowDefinitionHasDuplicateParameterKeys, +) from skyvern.forge.sdk.workflow.models.block import ( BlockResult, BlockType, @@ -34,6 +37,7 @@ from skyvern.forge.sdk.workflow.models.block import ( UploadToS3Block, ) from skyvern.forge.sdk.workflow.models.parameter import ( + PARAMETER_TYPE, AWSSecretParameter, ContextParameter, OutputParameter, @@ -145,11 +149,17 @@ class WorkflowService: # Set workflow run status to running, create workflow run parameters await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) + # Get all context parameters from the workflow definition + context_parameters = [ + parameter + for parameter in workflow.workflow_definition.parameters + if isinstance(parameter, ContextParameter) + ] # Get all tuples wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run.workflow_run_id) workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id) app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( - workflow_run_id, wp_wps_tuples, workflow_output_parameters + workflow_run_id, wp_wps_tuples, workflow_output_parameters, context_parameters ) # Execute workflow blocks blocks = workflow.workflow_definition.blocks @@ -649,10 +659,10 @@ class WorkflowService: organization_id=organization_id, title=request.title, description=request.description, - workflow_definition=WorkflowDefinition(blocks=[]), + workflow_definition=WorkflowDefinition(parameters=[], blocks=[]), ) # Create parameters from the request - parameters = {} + parameters: dict[str, PARAMETER_TYPE] = {} duplicate_parameter_keys = set() # We're going to process context parameters after other parameters since they depend on the other parameters @@ -701,10 +711,23 @@ class WorkflowService: # Now we can process the context parameters since all other parameters have been created for context_parameter in context_parameter_yamls: + if context_parameter.source_parameter_key not in parameters: + raise ContextParameterSourceNotDefined( + context_parameter_key=context_parameter.key, source_key=context_parameter.source_parameter_key + ) + + if context_parameter.key in parameters: + LOG.error(f"Duplicate parameter key {context_parameter.key}") + duplicate_parameter_keys.add(context_parameter.key) + continue + + # We're only adding the context parameter to the parameters dict, we're not creating it in the database + # It'll only be stored in the `workflow.workflow_definition` + # todo (kerem): should we have a database table for context parameters? parameters[context_parameter.key] = ContextParameter( key=context_parameter.key, description=context_parameter.description, - source=parameters[context_parameter.source_workflow_parameter_key], + source=parameters[context_parameter.source_parameter_key], # Context parameters don't have a default value, the value always depends on the source parameter value=None, ) @@ -720,7 +743,7 @@ class WorkflowService: block_label_mapping[block.label] = block # Set the blocks for the workflow definition - workflow_definition = WorkflowDefinition(blocks=blocks) + workflow_definition = WorkflowDefinition(parameters=parameters.values(), blocks=blocks) workflow = await self.update_workflow( workflow_id=workflow.workflow_id, workflow_definition=workflow_definition, diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index c4ccacdb..f5048c67 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -207,7 +207,9 @@ async def handle_download_file_action( # Start waiting for the download async with page.expect_download() as download_info: await asyncio.sleep(0.3) - await page.click(f"xpath={xpath}", timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) + await page.click( + f"xpath={xpath}", timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, modifiers=["Alt"] + ) download = await download_info.value