diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 98c44ca9..62c53000 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1264,7 +1264,7 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise - async def create_workflow_run_output_parameter( + async def create_or_update_workflow_run_output_parameter( self, workflow_run_id: str, output_parameter_id: str, @@ -1272,6 +1272,24 @@ class AgentDB: ) -> WorkflowRunOutputParameter: try: async with self.Session() as session: + # check if the workflow run output parameter already exists + # if it does, update the value + if workflow_run_output_parameter := ( + await session.scalars( + select(WorkflowRunOutputParameterModel) + .filter_by(workflow_run_id=workflow_run_id) + .filter_by(output_parameter_id=output_parameter_id) + ) + ).first(): + LOG.info( + f"Updating existing workflow run output parameter with {workflow_run_output_parameter.workflow_run_id} - {workflow_run_output_parameter.output_parameter_id}" + ) + workflow_run_output_parameter.value = value + await session.commit() + await session.refresh(workflow_run_output_parameter) + return convert_to_workflow_run_output_parameter(workflow_run_output_parameter, self.debug_enabled) + + # if it does not exist, create a new one workflow_run_output_parameter = WorkflowRunOutputParameterModel( workflow_run_id=workflow_run_id, output_parameter_id=output_parameter_id, @@ -1286,6 +1304,33 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise + async def update_workflow_run_output_parameter( + self, + workflow_run_id: str, + output_parameter_id: str, + value: dict[str, Any] | list | str | None, + ) -> WorkflowRunOutputParameter: + try: + async with self.Session() as session: + workflow_run_output_parameter = ( + await session.scalars( + select(WorkflowRunOutputParameterModel) + .filter_by(workflow_run_id=workflow_run_id) + .filter_by(output_parameter_id=output_parameter_id) + ) + ).first() + if not workflow_run_output_parameter: + raise NotFoundError( + f"WorkflowRunOutputParameter not found for {workflow_run_id} and {output_parameter_id}" + ) + workflow_run_output_parameter.value = value + await session.commit() + await session.refresh(workflow_run_output_parameter) + return convert_to_workflow_run_output_parameter(workflow_run_output_parameter, self.debug_enabled) + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + async def get_workflow_parameters(self, workflow_id: str) -> list[WorkflowParameter]: try: async with self.Session() as session: diff --git a/skyvern/forge/sdk/workflow/context_manager.py b/skyvern/forge/sdk/workflow/context_manager.py index 3be65991..b555350e 100644 --- a/skyvern/forge/sdk/workflow/context_manager.py +++ b/skyvern/forge/sdk/workflow/context_manager.py @@ -276,8 +276,7 @@ class WorkflowRunContext: self, parameter: OutputParameter, value: dict[str, Any] | list | str | None ) -> None: if parameter.key in self.values: - LOG.warning(f"Output parameter {parameter.output_parameter_id} already has a registered value") - return + LOG.warning(f"Output parameter {parameter.output_parameter_id} already has a registered value, overwriting") self.values[parameter.key] = value await self.set_parameter_values_for_output_parameter_dependent_blocks(parameter, value) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 648917e6..86c6f9b9 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -87,19 +87,11 @@ class Block(BaseModel, abc.ABC): workflow_run_id: str, value: dict[str, Any] | list | str | None = None, ) -> None: - if workflow_run_context.has_value(self.output_parameter.key): - LOG.warning( - "Output parameter value already recorded", - output_parameter_id=self.output_parameter.output_parameter_id, - workflow_run_id=workflow_run_id, - ) - return - await workflow_run_context.register_output_parameter_value_post_execution( parameter=self.output_parameter, value=value, ) - await app.DATABASE.create_workflow_run_output_parameter( + await app.DATABASE.create_or_update_workflow_run_output_parameter( workflow_run_id=workflow_run_id, output_parameter_id=self.output_parameter.output_parameter_id, value=value,