From d4bdca174f02f7c8c67af5e3e62a68a2673c4737 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 1 Aug 2025 17:07:08 -0700 Subject: [PATCH] remove workflow_permanent_id from projects table + add use_cache and cache_project_id to workflows table (#3090) --- ...57d1_remove_workflow_permanent_id_from_.py | 41 +++++++++++++++++++ skyvern/forge/sdk/db/client.py | 23 +++++++---- skyvern/forge/sdk/db/models.py | 5 +-- skyvern/forge/sdk/db/utils.py | 1 - skyvern/forge/sdk/routes/projects.py | 11 ----- skyvern/forge/sdk/workflow/models/yaml.py | 2 + skyvern/forge/sdk/workflow/service.py | 8 ++++ skyvern/schemas/projects.py | 2 - 8 files changed, 67 insertions(+), 26 deletions(-) create mode 100644 alembic/versions/2025_08_01_2306-1eedd7a957d1_remove_workflow_permanent_id_from_.py diff --git a/alembic/versions/2025_08_01_2306-1eedd7a957d1_remove_workflow_permanent_id_from_.py b/alembic/versions/2025_08_01_2306-1eedd7a957d1_remove_workflow_permanent_id_from_.py new file mode 100644 index 00000000..908383db --- /dev/null +++ b/alembic/versions/2025_08_01_2306-1eedd7a957d1_remove_workflow_permanent_id_from_.py @@ -0,0 +1,41 @@ +"""Remove workflow_permanent_id from projects table and add project_id to workflows table + +Revision ID: 1eedd7a957d1 +Revises: 2e58ef1b3d8b +Create Date: 2025-08-01 23:06:18.433869+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1eedd7a957d1" +down_revision: Union[str, None] = "2e58ef1b3d8b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("project_org_wpid_index"), table_name="projects") + op.drop_column("projects", "workflow_permanent_id") + op.add_column("workflows", sa.Column("use_cache", sa.Boolean(), nullable=False, server_default=sa.false())) + op.execute("UPDATE workflows SET use_cache = FALSE") + op.alter_column("workflows", "use_cache", server_default=None) + op.add_column("workflows", sa.Column("cache_project_id", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflows", "cache_project_id") + op.drop_column("workflows", "use_cache") + op.add_column("projects", sa.Column("workflow_permanent_id", sa.VARCHAR(), autoincrement=False, nullable=True)) + op.create_index( + op.f("project_org_wpid_index"), "projects", ["organization_id", "workflow_permanent_id"], unique=False + ) + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index b220728a..d9b0d2de 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1313,6 +1313,8 @@ class AgentDB: version: int | None = None, is_saved_task: bool = False, status: WorkflowStatus = WorkflowStatus.published, + use_cache: bool = False, + cache_project_id: str | None = None, ) -> Workflow: async with self.Session() as session: workflow = WorkflowModel( @@ -1330,6 +1332,8 @@ class AgentDB: model=model, is_saved_task=is_saved_task, status=status, + use_cache=use_cache, + cache_project_id=cache_project_id, ) if workflow_permanent_id: workflow.workflow_permanent_id = workflow_permanent_id @@ -1508,6 +1512,8 @@ class AgentDB: description: str | None = None, workflow_definition: dict[str, Any] | None = None, version: int | None = None, + use_cache: bool | None = None, + cache_project_id: str | None = None, ) -> Workflow: try: async with self.Session() as session: @@ -1517,14 +1523,18 @@ class AgentDB: if organization_id: get_workflow_query = get_workflow_query.filter_by(organization_id=organization_id) if workflow := (await session.scalars(get_workflow_query)).first(): - if title: + if title is not None: workflow.title = title - if description: + if description is not None: workflow.description = description - if workflow_definition: + if workflow_definition is not None: workflow.workflow_definition = workflow_definition - if version: + if version is not None: workflow.version = version + if use_cache is not None: + workflow.use_cache = use_cache + if cache_project_id is not None: + workflow.cache_project_id = cache_project_id await session.commit() await session.refresh(workflow) return convert_to_workflow(workflow, self.debug_enabled) @@ -3489,7 +3499,6 @@ class AgentDB: async def create_project( self, organization_id: str, - workflow_permanent_id: str | None = None, run_id: str | None = None, project_id: str | None = None, version: int | None = None, @@ -3498,7 +3507,6 @@ class AgentDB: async with self.Session() as session: project = ProjectModel( organization_id=organization_id, - workflow_permanent_id=workflow_permanent_id, run_id=run_id, ) if project_id: @@ -3518,7 +3526,6 @@ class AgentDB: project_revision_id: str, organization_id: str, artifact_id: str | None = None, - workflow_permanent_id: str | None = None, run_id: str | None = None, version: int | None = None, ) -> Project: @@ -3532,8 +3539,6 @@ class AgentDB: if project := (await session.scalars(get_project_query)).first(): if artifact_id: project.artifact_id = artifact_id - if workflow_permanent_id: - project.workflow_permanent_id = workflow_permanent_id if run_id: project.run_id = run_id if version: diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 96ac1bd0..618c5666 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -236,6 +236,8 @@ class WorkflowModel(Base): persist_browser_session = Column(Boolean, default=False, nullable=False) model = Column(JSON, nullable=True) status = Column(String, nullable=False, default="published") + use_cache = Column(Boolean, default=False, nullable=False) + cache_project_id = Column(String, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column( @@ -777,7 +779,6 @@ class ProjectModel(Base): __tablename__ = "projects" __table_args__ = ( Index("project_org_created_at_index", "organization_id", "created_at"), - Index("project_org_wpid_index", "organization_id", "workflow_permanent_id"), Index("project_org_run_id_index", "organization_id", "run_id"), UniqueConstraint("organization_id", "project_id", "version", name="uc_org_project_version"), ) @@ -785,8 +786,6 @@ class ProjectModel(Base): project_revision_id = Column(String, primary_key=True, default=generate_project_revision_id) project_id = Column(String, default=generate_project_id, nullable=False) # User-facing, consistent across versions organization_id = Column(String, nullable=False) - # the wpid that this project is associated with - workflow_permanent_id = Column(String, nullable=True) # The workflow run or task run id that this project is generated run_id = Column(String, nullable=True) version = Column(Integer, default=1, nullable=False) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 65d2c279..3b494c55 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -499,7 +499,6 @@ def convert_to_project(project_model: ProjectModel) -> Project: project_revision_id=project_model.project_revision_id, project_id=project_model.project_id, organization_id=project_model.organization_id, - workflow_id=project_model.workflow_permanent_id, run_id=project_model.run_id, version=project_model.version, created_at=project_model.created_at, diff --git a/skyvern/forge/sdk/routes/projects.py b/skyvern/forge/sdk/routes/projects.py index ac900cae..67ff188f 100644 --- a/skyvern/forge/sdk/routes/projects.py +++ b/skyvern/forge/sdk/routes/projects.py @@ -4,7 +4,6 @@ import hashlib import structlog from fastapi import Depends, HTTPException, Path, Query -from skyvern.exceptions import WorkflowNotFound from skyvern.forge import app from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router from skyvern.forge.sdk.schemas.organizations import Organization @@ -41,12 +40,6 @@ async def create_project( organization_id=organization_id, file_count=len(data.files) if data.files else 0, ) - # validate workflow_id and run_id - if data.workflow_id: - if not await app.DATABASE.get_workflow_by_permanent_id( - workflow_permanent_id=data.workflow_id, organization_id=organization_id - ): - raise WorkflowNotFound(workflow_permanent_id=data.workflow_id) if data.run_id: if not await app.DATABASE.get_run(run_id=data.run_id, organization_id=organization_id): raise HTTPException(status_code=404, detail=f"Run_id {data.run_id} not found") @@ -54,7 +47,6 @@ async def create_project( # Create the project in the database project = await app.DATABASE.create_project( organization_id=organization_id, - workflow_permanent_id=data.workflow_id, run_id=data.run_id, ) # Process files if provided @@ -72,7 +64,6 @@ async def create_project( return CreateProjectResponse( project_id=project.project_id, version=project.version, - workflow_id=project.workflow_id, run_id=project.run_id, file_count=file_count, created_at=project.created_at, @@ -221,7 +212,6 @@ async def deploy_project( new_version = latest_project.version + 1 new_project_revision = await app.DATABASE.create_project( organization_id=current_org.organization_id, - workflow_permanent_id=latest_project.workflow_id, run_id=latest_project.run_id, project_id=project_id, # Use the same project_id for versioning version=new_version, @@ -265,7 +255,6 @@ async def deploy_project( return CreateProjectResponse( project_id=new_project_revision.project_id, version=new_project_revision.version, - workflow_id=new_project_revision.workflow_id, run_id=new_project_revision.run_id, file_count=file_count, created_at=new_project_revision.created_at, diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index ef0c7e4e..d8343080 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -443,3 +443,5 @@ class WorkflowCreateYAMLRequest(BaseModel): max_screenshot_scrolls: int | None = None extra_http_headers: dict[str, str] | None = None status: WorkflowStatus = WorkflowStatus.published + use_cache: bool = False + cache_project_id: str | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index c26b501d..2425b66d 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -635,6 +635,8 @@ class WorkflowService: is_saved_task: bool = False, status: WorkflowStatus = WorkflowStatus.published, extra_http_headers: dict[str, str] | None = None, + use_cache: bool = False, + cache_project_id: str | None = None, ) -> Workflow: return await app.DATABASE.create_workflow( title=title, @@ -653,6 +655,8 @@ class WorkflowService: is_saved_task=is_saved_task, status=status, extra_http_headers=extra_http_headers, + use_cache=use_cache, + cache_project_id=cache_project_id, ) async def get_workflow(self, workflow_id: str, organization_id: str | None = None) -> Workflow: @@ -1534,6 +1538,8 @@ class WorkflowService: version=existing_version + 1, is_saved_task=request.is_saved_task, status=request.status, + use_cache=request.use_cache, + cache_project_id=request.cache_project_id, ) else: workflow = await self.create_workflow( @@ -1551,6 +1557,8 @@ class WorkflowService: extra_http_headers=request.extra_http_headers, is_saved_task=request.is_saved_task, status=request.status, + use_cache=request.use_cache, + cache_project_id=request.cache_project_id, ) # Keeping track of the new workflow id to delete it if an error occurs during the creation process new_workflow_id = workflow.workflow_id diff --git a/skyvern/schemas/projects.py b/skyvern/schemas/projects.py index 34f58982..a38eaff8 100644 --- a/skyvern/schemas/projects.py +++ b/skyvern/schemas/projects.py @@ -75,7 +75,6 @@ class DeployProjectRequest(BaseModel): class CreateProjectResponse(BaseModel): project_id: str = Field(..., description="Unique project identifier", examples=["proj_abc123"]) version: int = Field(..., description="Project version number", examples=[1]) - workflow_id: str | None = Field(default=None, description="ID of the workflow this project is associated with") run_id: str | None = Field( default=None, description="ID of the workflow run or task run that generated this project" ) @@ -90,7 +89,6 @@ class Project(BaseModel): project_revision_id: str = Field(description="Unique identifier for this specific project revision") project_id: str = Field(description="User-facing project identifier, consistent across versions") organization_id: str = Field(description="ID of the organization that owns this project") - workflow_id: str | None = Field(default=None, description="ID of the workflow this project is associated with") run_id: str | None = Field( default=None, description="ID of the workflow run or task run that generated this project" )