Delete workflow scripts (bust cache) when workflow definition is updated (#3580)
This commit is contained in:
@@ -4455,3 +4455,38 @@ class AgentDB:
|
||||
except Exception:
|
||||
LOG.error("UnexpectedError", exc_info=True)
|
||||
raise
|
||||
|
||||
async def delete_workflow_scripts_by_permanent_id(
|
||||
self,
|
||||
organization_id: str,
|
||||
workflow_permanent_id: str,
|
||||
) -> int:
|
||||
"""
|
||||
Soft delete all published workflow scripts for a workflow permanent id by setting deleted_at timestamp.
|
||||
|
||||
Returns True if any records were deleted, False otherwise.
|
||||
"""
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
stmt = (
|
||||
update(WorkflowScriptModel)
|
||||
.where(
|
||||
and_(
|
||||
WorkflowScriptModel.organization_id == organization_id,
|
||||
WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id,
|
||||
WorkflowScriptModel.deleted_at.is_(None),
|
||||
)
|
||||
)
|
||||
.values(deleted_at=datetime.now(timezone.utc))
|
||||
)
|
||||
|
||||
result = await session.execute(stmt)
|
||||
await session.commit()
|
||||
|
||||
return result.rowcount
|
||||
except SQLAlchemyError:
|
||||
LOG.error("SQLAlchemyError", exc_info=True)
|
||||
raise
|
||||
except Exception:
|
||||
LOG.error("UnexpectedError", exc_info=True)
|
||||
raise
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
import uuid
|
||||
from collections import deque
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
@@ -114,6 +115,52 @@ DEFAULT_FIRST_BLOCK_LABEL = "block_1"
|
||||
DEFAULT_WORKFLOW_TITLE = "New Workflow"
|
||||
|
||||
|
||||
def _get_workflow_definition_without_dates(workflow_definition: WorkflowDefinition) -> dict[str, Any]:
|
||||
"""
|
||||
This function dumps the workflow definition and removes the created_at and modified_at fields inside:
|
||||
- list of blocks
|
||||
- list of parameters
|
||||
And return the dumped workflow definition as a python dictionary.
|
||||
"""
|
||||
# Convert the workflow definition to a dictionary
|
||||
workflow_dict = workflow_definition.model_dump()
|
||||
fields_to_remove = [
|
||||
"created_at",
|
||||
"modified_at",
|
||||
"deleted_at",
|
||||
"output_parameter_id",
|
||||
"workflow_id",
|
||||
"workflow_parameter_id",
|
||||
]
|
||||
|
||||
# Use BFS to recursively remove fields from all nested objects
|
||||
|
||||
# Queue to store objects to process
|
||||
queue = deque([workflow_dict])
|
||||
|
||||
while queue:
|
||||
current_obj = queue.popleft()
|
||||
|
||||
if isinstance(current_obj, dict):
|
||||
# Remove specified fields from current dictionary
|
||||
for field in fields_to_remove:
|
||||
if field: # Skip empty string
|
||||
current_obj.pop(field, None)
|
||||
|
||||
# Add all nested dictionaries and lists to queue for processing
|
||||
for value in current_obj.values():
|
||||
if isinstance(value, (dict, list)):
|
||||
queue.append(value)
|
||||
|
||||
elif isinstance(current_obj, list):
|
||||
# Add all items in the list to queue for processing
|
||||
for item in current_obj:
|
||||
if isinstance(item, (dict, list)):
|
||||
queue.append(item)
|
||||
|
||||
return workflow_dict
|
||||
|
||||
|
||||
class WorkflowService:
|
||||
@staticmethod
|
||||
def _collect_extracted_information(value: Any) -> list[Any]:
|
||||
@@ -877,13 +924,55 @@ class WorkflowService:
|
||||
if workflow_definition:
|
||||
workflow_definition.validate()
|
||||
|
||||
return await app.DATABASE.update_workflow(
|
||||
# Update the workflow
|
||||
updated_workflow = await app.DATABASE.update_workflow(
|
||||
workflow_id=workflow_id,
|
||||
title=title,
|
||||
organization_id=organization_id,
|
||||
description=description,
|
||||
workflow_definition=(workflow_definition.model_dump() if workflow_definition else None),
|
||||
)
|
||||
updated_version = updated_workflow.version
|
||||
previous_workflow = None
|
||||
if updated_version > 1:
|
||||
previous_workflow = await app.DATABASE.get_workflow_by_permanent_id(
|
||||
workflow_permanent_id=updated_workflow.workflow_permanent_id,
|
||||
organization_id=organization_id,
|
||||
version=updated_version - 1,
|
||||
)
|
||||
|
||||
# Check if workflow definition changed and delete published workflow scripts if so
|
||||
if (
|
||||
workflow_definition
|
||||
and previous_workflow
|
||||
and organization_id
|
||||
and _get_workflow_definition_without_dates(previous_workflow.workflow_definition)
|
||||
!= _get_workflow_definition_without_dates(workflow_definition)
|
||||
):
|
||||
try:
|
||||
deleted_count = await app.DATABASE.delete_workflow_scripts_by_permanent_id(
|
||||
organization_id=organization_id,
|
||||
workflow_permanent_id=updated_workflow.workflow_permanent_id,
|
||||
)
|
||||
if deleted_count > 0:
|
||||
LOG.info(
|
||||
"Deleted published workflow scripts due to workflow definition change",
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=updated_workflow.workflow_permanent_id,
|
||||
organization_id=organization_id,
|
||||
deleted_count=deleted_count,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"Failed to delete published workflow scripts after workflow definition change",
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=updated_workflow.workflow_permanent_id,
|
||||
organization_id=organization_id,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return updated_workflow
|
||||
|
||||
async def delete_workflow_by_permanent_id(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user