diff --git a/alembic/versions/2025_12_12_0232-b4738bd17198_workflow_templates_table_updates_oss.py b/alembic/versions/2025_12_12_0232-b4738bd17198_workflow_templates_table_updates_oss.py new file mode 100644 index 00000000..9f7d43ed --- /dev/null +++ b/alembic/versions/2025_12_12_0232-b4738bd17198_workflow_templates_table_updates_oss.py @@ -0,0 +1,55 @@ +"""workflow_templates table updates oss + +Revision ID: b4738bd17198 +Revises: 7ab8e817802a +Create Date: 2025-12-12 02:32:15.078365+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b4738bd17198" +down_revision: Union[str, None] = "7ab8e817802a" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "workflow_templates", + sa.Column("workflow_template_id", sa.String(), nullable=False), + sa.Column("workflow_permanent_id", sa.String(), nullable=False), + sa.Column("organization_id", sa.String(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("modified_at", sa.DateTime(), nullable=False), + sa.Column("deleted_at", sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint( + ["organization_id"], + ["organizations.organization_id"], + ), + sa.PrimaryKeyConstraint("workflow_template_id"), + ) + op.create_index( + op.f("ix_workflow_templates_organization_id"), "workflow_templates", ["organization_id"], unique=False + ) + op.create_index( + op.f("ix_workflow_templates_workflow_permanent_id"), + "workflow_templates", + ["workflow_permanent_id"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_workflow_templates_workflow_permanent_id"), table_name="workflow_templates") + op.drop_index(op.f("ix_workflow_templates_organization_id"), table_name="workflow_templates") + op.drop_table("workflow_templates") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 0b1239d7..dcd90f75 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -51,6 +51,7 @@ from skyvern.forge.sdk.db.models import ( WorkflowRunOutputParameterModel, WorkflowRunParameterModel, WorkflowScriptModel, + WorkflowTemplateModel, ) from skyvern.forge.sdk.db.utils import ( _custom_json_serializer, @@ -1525,7 +1526,19 @@ class AgentDB: if organization_id: get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id) if workflow := (await session.scalars(get_workflow_query)).first(): - return convert_to_workflow(workflow, self.debug_enabled) + is_template = ( + await self.is_workflow_template( + workflow_permanent_id=workflow.workflow_permanent_id, + organization_id=workflow.organization_id, + ) + if organization_id + else False + ) + return convert_to_workflow( + workflow, + self.debug_enabled, + is_template=is_template, + ) return None except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) @@ -1552,7 +1565,19 @@ class AgentDB: get_workflow_query = get_workflow_query.order_by(WorkflowModel.version.desc()) async with self.Session() as session: if workflow := (await session.scalars(get_workflow_query)).first(): - return convert_to_workflow(workflow, self.debug_enabled) + is_template = ( + await self.is_workflow_template( + workflow_permanent_id=workflow.workflow_permanent_id, + organization_id=workflow.organization_id, + ) + if organization_id + else False + ) + return convert_to_workflow( + workflow, + self.debug_enabled, + is_template=is_template, + ) return None except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) @@ -1581,7 +1606,19 @@ class AgentDB: get_workflow_query = get_workflow_query.filter(WorkflowRunModel.workflow_run_id == workflow_run_id) async with self.Session() as session: if workflow := (await session.scalars(get_workflow_query)).first(): - return convert_to_workflow(workflow, self.debug_enabled) + is_template = ( + await self.is_workflow_template( + workflow_permanent_id=workflow.workflow_permanent_id, + organization_id=workflow.organization_id, + ) + if organization_id + else False + ) + return convert_to_workflow( + workflow, + self.debug_enabled, + is_template=is_template, + ) return None except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) @@ -1606,7 +1643,18 @@ class AgentDB: async with self.Session() as session: workflows = (await session.scalars(get_workflows_query)).all() - return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows] + template_permanent_ids: set[str] = set() + if workflows and organization_id: + template_permanent_ids = await self.get_org_template_permanent_ids(organization_id) + + return [ + convert_to_workflow( + workflow, + self.debug_enabled, + is_template=workflow.workflow_permanent_id in template_permanent_ids, + ) + for workflow in workflows + ] except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) raise @@ -1655,7 +1703,20 @@ class AgentDB: main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size) ) workflows = (await session.scalars(main_query)).all() - return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows] + + # Map template status by permanent_id so API responses surface is_template + template_permanent_ids: set[str] = set() + if workflows and organization_id: + template_permanent_ids = await self.get_org_template_permanent_ids(organization_id) + + return [ + convert_to_workflow( + workflow, + self.debug_enabled, + is_template=workflow.workflow_permanent_id in template_permanent_ids, + ) + for workflow in workflows + ] except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) raise @@ -1667,6 +1728,7 @@ class AgentDB: page_size: int = 10, only_saved_tasks: bool = False, only_workflows: bool = False, + only_templates: bool = False, search_key: str | None = None, folder_id: str | None = None, statuses: list[WorkflowStatus] | None = None, @@ -1717,6 +1779,13 @@ class AgentDB: main_query = main_query.where(WorkflowModel.is_saved_task.is_(True)) elif only_workflows: main_query = main_query.where(WorkflowModel.is_saved_task.is_(False)) + if only_templates: + # Filter by workflow_templates table (templates at permanent_id level) + template_subquery = select(WorkflowTemplateModel.workflow_permanent_id).where( + WorkflowTemplateModel.organization_id == organization_id, + WorkflowTemplateModel.deleted_at.is_(None), + ) + main_query = main_query.where(WorkflowModel.workflow_permanent_id.in_(template_subquery)) if statuses: main_query = main_query.where(WorkflowModel.status.in_(statuses)) if folder_id: @@ -1851,7 +1920,18 @@ class AgentDB: main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size) ) workflows = (await session.scalars(main_query)).all() - return [convert_to_workflow(workflow, self.debug_enabled) for workflow in workflows] + template_permanent_ids: set[str] = set() + if workflows and organization_id: + template_permanent_ids = await self.get_org_template_permanent_ids(organization_id) + + return [ + convert_to_workflow( + workflow, + self.debug_enabled, + is_template=workflow.workflow_permanent_id in template_permanent_ids, + ) + for workflow in workflows + ] except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) raise @@ -1895,7 +1975,19 @@ class AgentDB: workflow.import_error = import_error await session.commit() await session.refresh(workflow) - return convert_to_workflow(workflow, self.debug_enabled) + is_template = ( + await self.is_workflow_template( + workflow_permanent_id=workflow.workflow_permanent_id, + organization_id=workflow.organization_id, + ) + if organization_id + else False + ) + return convert_to_workflow( + workflow, + self.debug_enabled, + is_template=is_template, + ) else: raise NotFoundError("Workflow not found") except SQLAlchemyError: @@ -1927,6 +2019,95 @@ class AgentDB: await session.execute(update_deleted_at_query) await session.commit() + async def add_workflow_template( + self, + workflow_permanent_id: str, + organization_id: str, + ) -> None: + """Add a workflow to the templates table.""" + try: + async with self.Session() as session: + existing = ( + await session.scalars( + select(WorkflowTemplateModel) + .where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id) + .where(WorkflowTemplateModel.organization_id == organization_id) + ) + ).first() + if existing: + if existing.deleted_at is not None: + existing.deleted_at = None + await session.commit() + return + template = WorkflowTemplateModel( + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + ) + session.add(template) + await session.commit() + except SQLAlchemyError: + LOG.error("SQLAlchemyError in add_workflow_template", exc_info=True) + raise + + async def remove_workflow_template( + self, + workflow_permanent_id: str, + organization_id: str, + ) -> None: + """Soft delete a workflow from the templates table.""" + try: + async with self.Session() as session: + update_deleted_at_query = ( + update(WorkflowTemplateModel) + .where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id) + .where(WorkflowTemplateModel.organization_id == organization_id) + .where(WorkflowTemplateModel.deleted_at.is_(None)) + .values(deleted_at=datetime.utcnow()) + ) + await session.execute(update_deleted_at_query) + await session.commit() + except SQLAlchemyError: + LOG.error("SQLAlchemyError in remove_workflow_template", exc_info=True) + raise + + async def get_org_template_permanent_ids( + self, + organization_id: str, + ) -> set[str]: + """Get all workflow_permanent_ids that are templates for an organization.""" + try: + async with self.Session() as session: + result = await session.scalars( + select(WorkflowTemplateModel.workflow_permanent_id) + .where(WorkflowTemplateModel.organization_id == organization_id) + .where(WorkflowTemplateModel.deleted_at.is_(None)) + ) + return set(result.all()) + except SQLAlchemyError: + LOG.error("SQLAlchemyError in get_org_template_permanent_ids", exc_info=True) + raise + + async def is_workflow_template( + self, + workflow_permanent_id: str, + organization_id: str, + ) -> bool: + """Check if a workflow is marked as a template.""" + try: + async with self.Session() as session: + result = ( + await session.scalars( + select(WorkflowTemplateModel) + .where(WorkflowTemplateModel.workflow_permanent_id == workflow_permanent_id) + .where(WorkflowTemplateModel.organization_id == organization_id) + .where(WorkflowTemplateModel.deleted_at.is_(None)) + ) + ).first() + return result is not None + except SQLAlchemyError: + LOG.error("SQLAlchemyError in is_workflow_template", exc_info=True) + raise + async def create_folder( self, organization_id: str, diff --git a/skyvern/forge/sdk/db/id.py b/skyvern/forge/sdk/db/id.py index 9ec7c6e5..e809cec4 100644 --- a/skyvern/forge/sdk/db/id.py +++ b/skyvern/forge/sdk/db/id.py @@ -64,6 +64,7 @@ WORKFLOW_PREFIX = "w" WORKFLOW_RUN_BLOCK_PREFIX = "wrb" WORKFLOW_RUN_PREFIX = "wr" WORKFLOW_SCRIPT_PREFIX = "ws" +WORKFLOW_TEMPLATE_PREFIX = "wt" def generate_workflow_id() -> str: @@ -91,6 +92,11 @@ def generate_workflow_script_id() -> str: return f"{WORKFLOW_SCRIPT_PREFIX}_{int_id}" +def generate_workflow_template_id() -> str: + int_id = generate_id() + return f"{WORKFLOW_TEMPLATE_PREFIX}_{int_id}" + + def generate_aws_secret_parameter_id() -> str: int_id = generate_id() return f"{AWS_SECRET_PARAMETER_PREFIX}_{int_id}" diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index efe180aa..81bf13f7 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -57,6 +57,7 @@ from skyvern.forge.sdk.db.id import ( generate_workflow_run_block_id, generate_workflow_run_id, generate_workflow_script_id, + generate_workflow_template_id, ) from skyvern.forge.sdk.schemas.task_v2 import ThoughtType @@ -295,6 +296,28 @@ class WorkflowModel(Base): is_saved_task = Column(Boolean, default=False, nullable=False) +class WorkflowTemplateModel(Base): + """ + Tracks which workflows are marked as templates. + Keyed by workflow_permanent_id (not versioned workflow_id) because + template status is a property of the workflow identity, not a version. + """ + + __tablename__ = "workflow_templates" + + workflow_template_id = Column(String, primary_key=True, default=generate_workflow_template_id) + workflow_permanent_id = Column(String, nullable=False, index=True) + organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True) + created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) + modified_at = Column( + DateTime, + default=datetime.datetime.utcnow, + onupdate=datetime.datetime.utcnow, + nullable=False, + ) + deleted_at = Column(DateTime, nullable=True) + + class WorkflowRunModel(Base): __tablename__ = "workflow_runs" __table_args__ = (Index("idx_workflow_runs_org_created", "organization_id", "created_at"),) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index bd6f6f8c..2105c569 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -308,7 +308,22 @@ def convert_to_artifact(artifact_model: ArtifactModel, debug_enabled: bool = Fal ) -def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = False) -> Workflow: +def convert_to_workflow( + workflow_model: WorkflowModel, + debug_enabled: bool = False, + is_template: bool = False, +) -> Workflow: + """ + Convert a WorkflowModel to a Workflow pydantic model. + + Args: + workflow_model: The database model to convert. + debug_enabled: Whether to log debug messages. + is_template: Whether this workflow is marked as a template. + This is computed separately from the workflow_templates table + since template status is at the workflow_permanent_id level, + not the versioned workflow level. + """ if debug_enabled: LOG.debug( "Converting WorkflowModel to Workflow", @@ -329,6 +344,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal max_screenshot_scrolls=workflow_model.max_screenshot_scrolling_times, version=workflow_model.version, is_saved_task=workflow_model.is_saved_task, + is_template=is_template, description=workflow_model.description, workflow_definition=WorkflowDefinition.model_validate(workflow_model.workflow_definition), created_at=workflow_model.created_at, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 8d34c4ae..cae9815f 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -2319,6 +2319,7 @@ async def get_workflows( page_size: int = Query(10, ge=1), only_saved_tasks: bool = Query(False), only_workflows: bool = Query(False), + only_templates: bool = Query(False), search_key: str | None = Query( None, description="Unified search across workflow title, folder name, and parameter metadata (key, description, default_value).", @@ -2372,12 +2373,35 @@ async def get_workflows( page_size=page_size, only_saved_tasks=only_saved_tasks, only_workflows=only_workflows, + only_templates=only_templates, search_key=effective_search, folder_id=folder_id, statuses=effective_statuses, ) +@base_router.put( + "/workflows/{workflow_permanent_id}/template", + tags=["Workflows"], +) +async def set_workflow_template_status( + workflow_permanent_id: str, + is_template: bool = Query(...), + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> dict: + """ + Set or unset a workflow as a template. + + Template status is stored at the workflow_permanent_id level (not per-version), + meaning all versions of a workflow share the same template status. + """ + return await app.WORKFLOW_SERVICE.set_template_status( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + is_template=is_template, + ) + + @legacy_base_router.get( "/workflows/templates", response_model=list[Workflow], diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 114fb407..9a831250 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -75,6 +75,7 @@ class Workflow(BaseModel): workflow_permanent_id: str version: int is_saved_task: bool + is_template: bool = False description: str | None = None workflow_definition: WorkflowDefinition proxy_location: ProxyLocationInput = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 0768c021..c2b865b0 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1579,6 +1579,40 @@ class WorkflowService: return workflow + async def set_template_status( + self, + organization_id: str, + workflow_permanent_id: str, + is_template: bool, + ) -> dict[str, Any]: + """ + Set or unset a workflow as a template. + + Template status is stored in a separate workflow_templates table keyed by + workflow_permanent_id, since template status is a property of the workflow + identity, not a specific version. + + Returns a dict with the result since we're not updating the workflow itself. + """ + # Verify workflow exists and belongs to org + await self.get_workflow_by_permanent_id( + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + ) + + if is_template: + await app.DATABASE.add_workflow_template( + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + ) + else: + await app.DATABASE.remove_workflow_template( + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + ) + + return {"workflow_permanent_id": workflow_permanent_id, "is_template": is_template} + async def get_workflow_versions_by_permanent_id( self, workflow_permanent_id: str, @@ -1688,6 +1722,7 @@ class WorkflowService: page_size: int = 10, only_saved_tasks: bool = False, only_workflows: bool = False, + only_templates: bool = False, search_key: str | None = None, folder_id: str | None = None, statuses: list[WorkflowStatus] | None = None, @@ -1705,6 +1740,7 @@ class WorkflowService: page_size=page_size, only_saved_tasks=only_saved_tasks, only_workflows=only_workflows, + only_templates=only_templates, search_key=search_key, folder_id=folder_id, statuses=statuses,