[Backend] Saving Workflows as Templates (#4278)

This commit is contained in:
Marc Kelechava
2025-12-11 18:39:21 -08:00
committed by GitHub
parent 42bdb23118
commit 526287e7ca
8 changed files with 350 additions and 8 deletions

View File

@@ -51,6 +51,7 @@ from skyvern.forge.sdk.db.models import (
WorkflowRunOutputParameterModel,
WorkflowRunParameterModel,
WorkflowScriptModel,
WorkflowTemplateModel,
)
from skyvern.forge.sdk.db.utils import (
_custom_json_serializer,
@@ -1525,7 +1526,19 @@ class AgentDB:
if organization_id:
get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id)
if workflow := (await session.scalars(get_workflow_query)).first():
return convert_to_workflow(workflow, self.debug_enabled)
is_template = (
await self.is_workflow_template(
workflow_permanent_id=workflow.workflow_permanent_id,
organization_id=workflow.organization_id,
)
if organization_id
else False
)
return convert_to_workflow(
workflow,
self.debug_enabled,
is_template=is_template,
)
return None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
@@ -1552,7 +1565,19 @@ class AgentDB:
get_workflow_query = get_workflow_query.order_by(WorkflowModel.version.desc())
async with self.Session() as session:
if workflow := (await session.scalars(get_workflow_query)).first():
return convert_to_workflow(workflow, self.debug_enabled)
is_template = (
await self.is_workflow_template(
workflow_permanent_id=workflow.workflow_permanent_id,
organization_id=workflow.organization_id,
)
if organization_id
else False
)
return convert_to_workflow(
workflow,
self.debug_enabled,
is_template=is_template,
)
return None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
@@ -1581,7 +1606,19 @@ class AgentDB:
get_workflow_query = get_workflow_query.filter(WorkflowRunModel.workflow_run_id == workflow_run_id)
async with self.Session() as session:
if workflow := (await session.scalars(get_workflow_query)).first():
return convert_to_workflow(workflow, self.debug_enabled)
is_template = (
await self.is_workflow_template(
workflow_permanent_id=workflow.workflow_permanent_id,
organization_id=workflow.organization_id,
)
if organization_id
else False
)
return convert_to_workflow(
workflow,
self.debug_enabled,
is_template=is_template,
)
return None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
@@ -1606,7 +1643,18 @@ class AgentDB:
async with self.Session() as session:
workflows = (await session.scalars(get_workflows_query)).all()
return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows]
template_permanent_ids: set[str] = set()
if workflows and organization_id:
template_permanent_ids = await self.get_org_template_permanent_ids(organization_id)
return [
convert_to_workflow(
workflow,
self.debug_enabled,
is_template=workflow.workflow_permanent_id in template_permanent_ids,
)
for workflow in workflows
]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
@@ -1655,7 +1703,20 @@ class AgentDB:
main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
)
workflows = (await session.scalars(main_query)).all()
return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows]
# Map template status by permanent_id so API responses surface is_template
template_permanent_ids: set[str] = set()
if workflows and organization_id:
template_permanent_ids = await self.get_org_template_permanent_ids(organization_id)
return [
convert_to_workflow(
workflow,
self.debug_enabled,
is_template=workflow.workflow_permanent_id in template_permanent_ids,
)
for workflow in workflows
]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
@@ -1667,6 +1728,7 @@ class AgentDB:
page_size: int = 10,
only_saved_tasks: bool = False,
only_workflows: bool = False,
only_templates: bool = False,
search_key: str | None = None,
folder_id: str | None = None,
statuses: list[WorkflowStatus] | None = None,
@@ -1717,6 +1779,13 @@ class AgentDB:
main_query = main_query.where(WorkflowModel.is_saved_task.is_(True))
elif only_workflows:
main_query = main_query.where(WorkflowModel.is_saved_task.is_(False))
if only_templates:
# Filter by workflow_templates table (templates at permanent_id level)
template_subquery = select(WorkflowTemplateModel.workflow_permanent_id).where(
WorkflowTemplateModel.organization_id == organization_id,
WorkflowTemplateModel.deleted_at.is_(None),
)
main_query = main_query.where(WorkflowModel.workflow_permanent_id.in_(template_subquery))
if statuses:
main_query = main_query.where(WorkflowModel.status.in_(statuses))
if folder_id:
@@ -1851,7 +1920,18 @@ class AgentDB:
main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
)
workflows = (await session.scalars(main_query)).all()
return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows]
template_permanent_ids: set[str] = set()
if workflows and organization_id:
template_permanent_ids = await self.get_org_template_permanent_ids(organization_id)
return [
convert_to_workflow(
workflow,
self.debug_enabled,
is_template=workflow.workflow_permanent_id in template_permanent_ids,
)
for workflow in workflows
]
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
@@ -1895,7 +1975,19 @@ class AgentDB:
workflow.import_error = import_error
await session.commit()
await session.refresh(workflow)
return convert_to_workflow(workflow, self.debug_enabled)
is_template = (
await self.is_workflow_template(
workflow_permanent_id=workflow.workflow_permanent_id,
organization_id=workflow.organization_id,
)
if organization_id
else False
)
return convert_to_workflow(
workflow,
self.debug_enabled,
is_template=is_template,
)
else:
raise NotFoundError("Workflow not found")
except SQLAlchemyError:
@@ -1927,6 +2019,95 @@ class AgentDB:
await session.execute(update_deleted_at_query)
await session.commit()
async def add_workflow_template(
self,
workflow_permanent_id: str,
organization_id: str,
) -> None:
"""Add a workflow to the templates table."""
try:
async with self.Session() as session:
existing = (
await session.scalars(
select(WorkflowTemplateModel)
.where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id)
.where(WorkflowTemplateModel.organization_id == organization_id)
)
).first()
if existing:
if existing.deleted_at is not None:
existing.deleted_at = None
await session.commit()
return
template = WorkflowTemplateModel(
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
)
session.add(template)
await session.commit()
except SQLAlchemyError:
LOG.error("SQLAlchemyError in add_workflow_template", exc_info=True)
raise
async def remove_workflow_template(
self,
workflow_permanent_id: str,
organization_id: str,
) -> None:
"""Soft delete a workflow from the templates table."""
try:
async with self.Session() as session:
update_deleted_at_query = (
update(WorkflowTemplateModel)
.where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id)
.where(WorkflowTemplateModel.organization_id == organization_id)
.where(WorkflowTemplateModel.deleted_at.is_(None))
.values(deleted_at=datetime.utcnow())
)
await session.execute(update_deleted_at_query)
await session.commit()
except SQLAlchemyError:
LOG.error("SQLAlchemyError in remove_workflow_template", exc_info=True)
raise
async def get_org_template_permanent_ids(
self,
organization_id: str,
) -> set[str]:
"""Get all workflow_permanent_ids that are templates for an organization."""
try:
async with self.Session() as session:
result = await session.scalars(
select(WorkflowTemplateModel.workflow_permanent_id)
.where(WorkflowTemplateModel.organization_id == organization_id)
.where(WorkflowTemplateModel.deleted_at.is_(None))
)
return set(result.all())
except SQLAlchemyError:
LOG.error("SQLAlchemyError in get_org_template_permanent_ids", exc_info=True)
raise
async def is_workflow_template(
self,
workflow_permanent_id: str,
organization_id: str,
) -> bool:
"""Check if a workflow is marked as a template."""
try:
async with self.Session() as session:
result = (
await session.scalars(
select(WorkflowTemplateModel)
.where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id)
.where(WorkflowTemplateModel.organization_id == organization_id)
.where(WorkflowTemplateModel.deleted_at.is_(None))
)
).first()
return result is not None
except SQLAlchemyError:
LOG.error("SQLAlchemyError in is_workflow_template", exc_info=True)
raise
async def create_folder(
self,
organization_id: str,

View File

@@ -64,6 +64,7 @@ WORKFLOW_PREFIX = "w"
WORKFLOW_RUN_BLOCK_PREFIX = "wrb"
WORKFLOW_RUN_PREFIX = "wr"
WORKFLOW_SCRIPT_PREFIX = "ws"
WORKFLOW_TEMPLATE_PREFIX = "wt"
def generate_workflow_id() -> str:
@@ -91,6 +92,11 @@ def generate_workflow_script_id() -> str:
return f"{WORKFLOW_SCRIPT_PREFIX}_{int_id}"
def generate_workflow_template_id() -> str:
int_id = generate_id()
return f"{WORKFLOW_TEMPLATE_PREFIX}_{int_id}"
def generate_aws_secret_parameter_id() -> str:
int_id = generate_id()
return f"{AWS_SECRET_PARAMETER_PREFIX}_{int_id}"

View File

@@ -57,6 +57,7 @@ from skyvern.forge.sdk.db.id import (
generate_workflow_run_block_id,
generate_workflow_run_id,
generate_workflow_script_id,
generate_workflow_template_id,
)
from skyvern.forge.sdk.schemas.task_v2 import ThoughtType
@@ -295,6 +296,28 @@ class WorkflowModel(Base):
is_saved_task = Column(Boolean, default=False, nullable=False)
class WorkflowTemplateModel(Base):
"""
Tracks which workflows are marked as templates.
Keyed by workflow_permanent_id (not versioned workflow_id) because
template status is a property of the workflow identity, not a version.
"""
__tablename__ = "workflow_templates"
workflow_template_id = Column(String, primary_key=True, default=generate_workflow_template_id)
workflow_permanent_id = Column(String, nullable=False, index=True)
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(
DateTime,
default=datetime.datetime.utcnow,
onupdate=datetime.datetime.utcnow,
nullable=False,
)
deleted_at = Column(DateTime, nullable=True)
class WorkflowRunModel(Base):
__tablename__ = "workflow_runs"
__table_args__ = (Index("idx_workflow_runs_org_created", "organization_id", "created_at"),)

View File

@@ -308,7 +308,22 @@ def convert_to_artifact(artifact_model: ArtifactModel, debug_enabled: bool = Fal
)
def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = False) -> Workflow:
def convert_to_workflow(
workflow_model: WorkflowModel,
debug_enabled: bool = False,
is_template: bool = False,
) -> Workflow:
"""
Convert a WorkflowModel to a Workflow pydantic model.
Args:
workflow_model: The database model to convert.
debug_enabled: Whether to log debug messages.
is_template: Whether this workflow is marked as a template.
This is computed separately from the workflow_templates table
since template status is at the workflow_permanent_id level,
not the versioned workflow level.
"""
if debug_enabled:
LOG.debug(
"Converting WorkflowModel to Workflow",
@@ -329,6 +344,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal
max_screenshot_scrolls=workflow_model.max_screenshot_scrolling_times,
version=workflow_model.version,
is_saved_task=workflow_model.is_saved_task,
is_template=is_template,
description=workflow_model.description,
workflow_definition=WorkflowDefinition.model_validate(workflow_model.workflow_definition),
created_at=workflow_model.created_at,