diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 509084a1..80bef714 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -4455,3 +4455,38 @@ class AgentDB: except Exception: LOG.error("UnexpectedError", exc_info=True) raise + + async def delete_workflow_scripts_by_permanent_id( + self, + organization_id: str, + workflow_permanent_id: str, + ) -> int: + """ + Soft delete all published workflow scripts for a workflow permanent id by setting deleted_at timestamp. + + Returns True if any records were deleted, False otherwise. + """ + try: + async with self.Session() as session: + stmt = ( + update(WorkflowScriptModel) + .where( + and_( + WorkflowScriptModel.organization_id == organization_id, + WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id, + WorkflowScriptModel.deleted_at.is_(None), + ) + ) + .values(deleted_at=datetime.now(timezone.utc)) + ) + + result = await session.execute(stmt) + await session.commit() + + return result.rowcount + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index b5fc3350..8bb22b62 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1,6 +1,7 @@ import asyncio import json import uuid +from collections import deque from datetime import UTC, datetime from typing import Any @@ -114,6 +115,52 @@ DEFAULT_FIRST_BLOCK_LABEL = "block_1" DEFAULT_WORKFLOW_TITLE = "New Workflow" +def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefinition) -> dict[str, Any]: + """ + This function dumps the workflow definition and removes the created_at and modified_at fields inside: + - list of blocks + - list of parameters + And return the dumped workflow definition as a python dictionary. + """ + # Convert the workflow definition to a dictionary + workflow_dict = workflow_definition.model_dump() + fields_to_remove = [ + "created_at", + "modified_at", + "deleted_at", + "output_parameter_id", + "workflow_id", + "workflow_parameter_id", + ] + + # Use BFS to recursively remove fields from all nested objects + + # Queue to store objects to process + queue = deque([workflow_dict]) + + while queue: + current_obj = queue.popleft() + + if isinstance(current_obj, dict): + # Remove specified fields from current dictionary + for field in fields_to_remove: + if field: # Skip empty string + current_obj.pop(field, None) + + # Add all nested dictionaries and lists to queue for processing + for value in current_obj.values(): + if isinstance(value, (dict, list)): + queue.append(value) + + elif isinstance(current_obj, list): + # Add all items in the list to queue for processing + for item in current_obj: + if isinstance(item, (dict, list)): + queue.append(item) + + return workflow_dict + + class WorkflowService: @staticmethod def _collect_extracted_information(value: Any) -> list[Any]: @@ -877,13 +924,55 @@ class WorkflowService: if workflow_definition: workflow_definition.validate() - return await app.DATABASE.update_workflow( + # Update the workflow + updated_workflow = await app.DATABASE.update_workflow( workflow_id=workflow_id, title=title, organization_id=organization_id, description=description, workflow_definition=(workflow_definition.model_dump() if workflow_definition else None), ) + updated_version = updated_workflow.version + previous_workflow = None + if updated_version > 1: + previous_workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id=updated_workflow.workflow_permanent_id, + organization_id=organization_id, + version=updated_version - 1, + ) + + # Check if workflow definition changed and delete published workflow scripts if so + if ( + workflow_definition + and previous_workflow + and organization_id + and _get_workflow_definition_without_dates(previous_workflow.workflow_definition) + != _get_workflow_definition_without_dates(workflow_definition) + ): + try: + deleted_count = await app.DATABASE.delete_workflow_scripts_by_permanent_id( + organization_id=organization_id, + workflow_permanent_id=updated_workflow.workflow_permanent_id, + ) + if deleted_count > 0: + LOG.info( + "Deleted published workflow scripts due to workflow definition change", + workflow_id=workflow_id, + workflow_permanent_id=updated_workflow.workflow_permanent_id, + organization_id=organization_id, + deleted_count=deleted_count, + ) + except Exception as e: + LOG.error( + "Failed to delete published workflow scripts after workflow definition change", + workflow_id=workflow_id, + workflow_permanent_id=updated_workflow.workflow_permanent_id, + organization_id=organization_id, + error=str(e), + exc_info=True, + ) + + return updated_workflow async def delete_workflow_by_permanent_id( self,