diff --git a/alembic/versions/2025_09_24_0347-4925c34e8d58_add_sequential_key.py b/alembic/versions/2025_09_24_0347-4925c34e8d58_add_sequential_key.py new file mode 100644 index 00000000..8947465e --- /dev/null +++ b/alembic/versions/2025_09_24_0347-4925c34e8d58_add_sequential_key.py @@ -0,0 +1,33 @@ +"""add_sequential_key + +Revision ID: 4925c34e8d58 +Revises: 6692fb5a3e91 +Create Date: 2025-09-24 03:47:37.324185+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "4925c34e8d58" +down_revision: Union[str, None] = "6692fb5a3e91" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_runs", sa.Column("sequential_key", sa.String(), nullable=True)) + op.add_column("workflows", sa.Column("sequential_key", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflows", "sequential_key") + op.drop_column("workflow_runs", "sequential_key") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 3b7dbd81..dadb41dc 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1399,6 +1399,7 @@ class AgentDB: ai_fallback: bool = False, cache_key: str | None = None, run_sequentially: bool = False, + sequential_key: str | None = None, ) -> Workflow: async with self.Session() as session: workflow = WorkflowModel( @@ -1420,6 +1421,7 @@ class AgentDB: ai_fallback=ai_fallback, cache_key=cache_key, run_sequentially=run_sequentially, + sequential_key=sequential_key, ) if workflow_permanent_id: workflow.workflow_permanent_id = workflow_permanent_id @@ -1693,6 +1695,7 @@ class AgentDB: max_screenshot_scrolling_times: int | None = None, extra_http_headers: dict[str, str] | None = None, browser_address: str | None = None, + sequential_key: str | None = None, ) -> WorkflowRun: try: async with self.Session() as session: @@ -1710,6 +1713,7 @@ class AgentDB: max_screenshot_scrolling_times=max_screenshot_scrolling_times, extra_http_headers=extra_http_headers, browser_address=browser_address, + sequential_key=sequential_key, ) session.add(workflow_run) await session.commit() @@ -1728,6 +1732,7 @@ class AgentDB: ai_fallback_triggered: bool | None = None, job_id: str | None = None, run_with: str | None = None, + sequential_key: str | None = None, ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( @@ -1752,6 +1757,8 @@ class AgentDB: workflow_run.job_id = job_id if run_with: workflow_run.run_with = run_with + if sequential_key: + workflow_run.sequential_key = sequential_key await session.commit() await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) @@ -1826,6 +1833,7 @@ class AgentDB: self, workflow_permanent_id: str, organization_id: str | None = None, + sequential_key: str | None = None, ) -> WorkflowRun | None: try: async with self.Session() as session: @@ -1833,6 +1841,8 @@ class AgentDB: if organization_id: query = query.filter_by(organization_id=organization_id) query = query.filter_by(status=WorkflowRunStatus.queued) + if sequential_key: + query = query.filter_by(sequential_key=sequential_key) query = query.order_by(WorkflowRunModel.modified_at.desc()) workflow_run = (await session.scalars(query)).first() return convert_to_workflow_run(workflow_run) if workflow_run else None @@ -1844,6 +1854,7 @@ class AgentDB: self, workflow_permanent_id: str, organization_id: str | None = None, + sequential_key: str | None = None, ) -> WorkflowRun | None: try: async with self.Session() as session: @@ -1851,6 +1862,8 @@ class AgentDB: if organization_id: query = query.filter_by(organization_id=organization_id) query = query.filter_by(status=WorkflowRunStatus.running) + if sequential_key: + query = query.filter_by(sequential_key=sequential_key) query = query.filter( WorkflowRunModel.started_at.isnot(None) ) # filter out workflow runs that does not have a started_at timestamp diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 3de91aff..57934b84 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -248,6 +248,7 @@ class WorkflowModel(Base): ai_fallback = Column(Boolean, default=False, nullable=False) cache_key = Column(String, nullable=True) run_sequentially = Column(Boolean, nullable=True) + sequential_key = Column(String, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column( @@ -286,6 +287,7 @@ class WorkflowRunModel(Base): browser_address = Column(String, nullable=True) script_run = Column(JSON, nullable=True) job_id = Column(String, nullable=True) + sequential_key = Column(String, nullable=True) run_with = Column(String, nullable=True) # 'agent' or 'code' queued_at = Column(DateTime, nullable=True) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 12aafa79..2232b658 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -284,6 +284,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal ai_fallback=workflow_model.ai_fallback, cache_key=workflow_model.cache_key, run_sequentially=workflow_model.run_sequentially, + sequential_key=workflow_model.sequential_key, ) @@ -322,6 +323,7 @@ def convert_to_workflow_run( extra_http_headers=workflow_run_model.extra_http_headers, browser_address=workflow_run_model.browser_address, job_id=workflow_run_model.job_id, + sequential_key=workflow_run_model.sequential_key, script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run) if workflow_run_model.script_run else None, diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index cff50296..edb5608f 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -80,6 +80,7 @@ class Workflow(BaseModel): ai_fallback: bool = False cache_key: str | None = None run_sequentially: bool | None = None + sequential_key: str | None = None created_at: datetime modified_at: datetime @@ -139,6 +140,7 @@ class WorkflowRun(BaseModel): run_with: str | None = None script_run: ScriptRunResponse | None = None job_id: str | None = None + sequential_key: str | None = None queued_at: datetime | None = None started_at: datetime | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 8e5b7ad5..5c65ba21 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -180,6 +180,7 @@ class WorkflowService: workflow_id=workflow_id, organization_id=organization.organization_id, parent_workflow_run_id=parent_workflow_run_id, + sequential_key=workflow.sequential_key, ) LOG.info( f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}", @@ -637,6 +638,7 @@ class WorkflowService: cache_key: str | None = None, ai_fallback: bool | None = None, run_sequentially: bool = False, + sequential_key: str | None = None, ) -> Workflow: return await app.DATABASE.create_workflow( title=title, @@ -659,6 +661,7 @@ class WorkflowService: cache_key=cache_key, ai_fallback=False if ai_fallback is None else ai_fallback, run_sequentially=run_sequentially, + sequential_key=sequential_key, ) async def create_workflow_from_prompt( @@ -942,6 +945,7 @@ class WorkflowService: workflow_id: str, organization_id: str, parent_workflow_run_id: str | None = None, + sequential_key: str | None = None, ) -> WorkflowRun: # validate the browser session id if workflow_request.browser_session_id: @@ -965,6 +969,7 @@ class WorkflowService: max_screenshot_scrolling_times=workflow_request.max_screenshot_scrolls, extra_http_headers=workflow_request.extra_http_headers, browser_address=workflow_request.browser_address, + sequential_key=sequential_key, ) async def _update_workflow_run_status( @@ -1763,6 +1768,7 @@ class WorkflowService: cache_key=request.cache_key, ai_fallback=request.ai_fallback, run_sequentially=request.run_sequentially, + sequential_key=request.sequential_key, ) else: workflow = await self.create_workflow( @@ -1784,6 +1790,7 @@ class WorkflowService: cache_key=request.cache_key, ai_fallback=request.ai_fallback, run_sequentially=request.run_sequentially, + sequential_key=request.sequential_key, ) # Keeping track of the new workflow id to delete it if an error occurs during the creation process new_workflow_id = workflow.workflow_id diff --git a/skyvern/schemas/workflows.py b/skyvern/schemas/workflows.py index 776001a6..8fe93d2b 100644 --- a/skyvern/schemas/workflows.py +++ b/skyvern/schemas/workflows.py @@ -518,6 +518,7 @@ class WorkflowCreateYAMLRequest(BaseModel): ai_fallback: bool = False cache_key: str | None = None run_sequentially: bool = False + sequential_key: str | None = None class WorkflowRequest(BaseModel):