diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index a37721fe..29010002 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -4510,6 +4510,8 @@ class AgentDB: self, organization_id: str, workflow_permanent_id: str, + statuses: list[ScriptStatus] | None = None, + script_ids: list[str] | None = None, ) -> int: """ Soft delete all published workflow scripts for a workflow permanent id by setting deleted_at timestamp. @@ -4530,6 +4532,12 @@ class AgentDB: .values(deleted_at=datetime.now(timezone.utc)) ) + if statuses: + stmt = stmt.where(WorkflowScriptModel.status.in_([s.value for s in statuses])) + + if script_ids: + stmt = stmt.where(WorkflowScriptModel.script_id.in_(script_ids)) + result = await session.execute(stmt) await session.commit() @@ -4545,6 +4553,7 @@ class AgentDB: self, organization_id: str, workflow_permanent_id: str, + statuses: list[ScriptStatus] | None = None, ) -> list[WorkflowScriptModel]: try: async with self.Session() as session: @@ -4555,6 +4564,9 @@ class AgentDB: .filter_by(deleted_at=None) ) + if statuses: + query = query.filter(WorkflowScriptModel.status.in_([s.value for s in statuses])) + return (await session.scalars(query)).all() except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7a5a3b78..35411009 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -99,6 +99,7 @@ from skyvern.forge.sdk.workflow.models.workflow import ( WorkflowRunStatus, ) from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse +from skyvern.schemas.scripts import ScriptStatus from skyvern.schemas.workflows import ( BLOCK_YAML_TYPES, BlockStatus, @@ -1043,31 +1044,63 @@ class WorkflowService: has_changes = False if previous_valid_workflow and has_changes and delete_script: - to_delete = await app.DATABASE.get_workflow_scripts_by_permanent_id( + candidates = await app.DATABASE.get_workflow_scripts_by_permanent_id( organization_id=organization_id, workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, ) - if len(to_delete) > 0: + to_delete_published = [script for script in candidates if script.status == ScriptStatus.published] + to_delete = [script for script in candidates if script.status != ScriptStatus.published] + + if len(to_delete_published) > 0: if not delete_code_cache_is_ok: + LOG.info( + "Workflow definition changed, asking user if deleting published code is ok", + workflow_id=workflow.workflow_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + organization_id=organization_id, + previous_version=previous_valid_workflow.version, + new_version=workflow.version, + to_delete_non_published_cnt=len(to_delete), + to_delete_published_cnt=len(to_delete_published), + ) + raise CannotUpdateWorkflowDueToCodeCache( workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, ) else: - try: - await app.DATABASE.delete_workflow_scripts_by_permanent_id( - organization_id=organization_id, - workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, - ) - except Exception as e: - LOG.error( - "Failed to delete published workflow scripts after workflow definition change", - workflow_id=workflow.workflow_id, - workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, - organization_id=organization_id, - error=str(e), - exc_info=True, - ) + LOG.info( + "Workflow definition changed, user answered yes to deleting published code", + workflow_id=workflow.workflow_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + organization_id=organization_id, + previous_version=previous_valid_workflow.version, + new_version=workflow.version, + to_delete_non_published_cnt=len(to_delete), + to_delete_published_cnt=len(to_delete_published), + ) + + to_delete.extend(to_delete_published) + + if len(to_delete) > 0: + try: + await app.DATABASE.delete_workflow_scripts_by_permanent_id( + organization_id=organization_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + script_ids=[s.script_id for s in to_delete], + ) + except Exception as e: + LOG.error( + "Failed to delete workflow scripts after workflow definition change", + workflow_id=workflow.workflow_id, + workflow_permanent_id=previous_valid_workflow.workflow_permanent_id, + organization_id=organization_id, + previous_version=previous_valid_workflow.version, + new_version=workflow.version, + error=str(e), + to_delete_ids=[script.script_id for script in to_delete], + to_delete_cnt=len(to_delete), + ) async def delete_workflow_by_permanent_id( self,