From 7dd1d5241c30b3645d9295e8e84a51aeb9d20e04 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Thu, 18 Sep 2025 13:32:55 +0800 Subject: [PATCH] denpendent workflow run (#3457) --- ...daee7d1_add_workflow_sequential_setting.py | 33 +++++++++++++++++++ skyvern/forge/sdk/db/client.py | 23 +++++++++++++ skyvern/forge/sdk/db/models.py | 2 ++ skyvern/forge/sdk/db/utils.py | 2 ++ skyvern/forge/sdk/workflow/models/workflow.py | 2 ++ skyvern/forge/sdk/workflow/service.py | 4 +++ skyvern/schemas/workflows.py | 1 + 7 files changed, 67 insertions(+) create mode 100644 alembic/versions/2025_09_18_0529-8f208daee7d1_add_workflow_sequential_setting.py diff --git a/alembic/versions/2025_09_18_0529-8f208daee7d1_add_workflow_sequential_setting.py b/alembic/versions/2025_09_18_0529-8f208daee7d1_add_workflow_sequential_setting.py new file mode 100644 index 00000000..4e2356f7 --- /dev/null +++ b/alembic/versions/2025_09_18_0529-8f208daee7d1_add_workflow_sequential_setting.py @@ -0,0 +1,33 @@ +"""add workflow sequential setting + +Revision ID: 8f208daee7d1 +Revises: 67add341e926 +Create Date: 2025-09-18 05:29:42.472400+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "8f208daee7d1" +down_revision: Union[str, None] = "67add341e926" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_runs", sa.Column("job_id", sa.String(), nullable=True)) + op.add_column("workflows", sa.Column("run_sequentially", sa.Boolean(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflows", "run_sequentially") + op.drop_column("workflow_runs", "job_id") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 9d41a584..c378fa3b 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1371,6 +1371,7 @@ class AgentDB: generate_script: bool = False, ai_fallback: bool = False, cache_key: str | None = None, + run_sequentially: bool = False, ) -> Workflow: async with self.Session() as session: workflow = WorkflowModel( @@ -1391,6 +1392,7 @@ class AgentDB: generate_script=generate_script, ai_fallback=ai_fallback, cache_key=cache_key, + run_sequentially=run_sequentially, ) if workflow_permanent_id: workflow.workflow_permanent_id = workflow_permanent_id @@ -1673,6 +1675,7 @@ class AgentDB: failure_reason: str | None = None, webhook_failure_reason: str | None = None, ai_fallback_triggered: bool | None = None, + job_id: str | None = None, ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( @@ -1693,6 +1696,8 @@ class AgentDB: workflow_run.webhook_failure_reason = webhook_failure_reason if ai_fallback_triggered is not None: workflow_run.script_run = {"ai_fallback_triggered": ai_fallback_triggered} + if job_id: + workflow_run.job_id = job_id await session.commit() await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) @@ -1763,6 +1768,24 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise + async def get_last_queued_workflow_run( + self, + workflow_permanent_id: str, + organization_id: str | None = None, + ) -> WorkflowRun | None: + try: + async with self.Session() as session: + query = select(WorkflowRunModel).filter_by(workflow_permanent_id=workflow_permanent_id) + if organization_id: + query = query.filter_by(organization_id=organization_id) + query = query.filter_by(status=WorkflowRunStatus.queued) + query = query.order_by(WorkflowRunModel.modified_at.desc()) + workflow_run = (await session.scalars(query)).first() + return convert_to_workflow_run(workflow_run) if workflow_run else None + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + async def get_workflow_runs( self, organization_id: str, diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 1f17e436..232f01ea 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -247,6 +247,7 @@ class WorkflowModel(Base): generate_script = Column(Boolean, default=False, nullable=False) ai_fallback = Column(Boolean, default=False, nullable=False) cache_key = Column(String, nullable=True) + run_sequentially = Column(Boolean, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column( @@ -284,6 +285,7 @@ class WorkflowRunModel(Base): extra_http_headers = Column(JSON, nullable=True) browser_address = Column(String, nullable=True) script_run = Column(JSON, nullable=True) + job_id = Column(String, nullable=True) queued_at = Column(DateTime, nullable=True) started_at = Column(DateTime, nullable=True) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 8aa00c1e..cf47dbb2 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -266,6 +266,7 @@ def convert_to_workflow(workflow_model: WorkflowModel, debug_enabled: bool = Fal generate_script=workflow_model.generate_script, ai_fallback=workflow_model.ai_fallback, cache_key=workflow_model.cache_key, + run_sequentially=workflow_model.run_sequentially, ) @@ -303,6 +304,7 @@ def convert_to_workflow_run( max_screenshot_scrolls=workflow_run_model.max_screenshot_scrolling_times, extra_http_headers=workflow_run_model.extra_http_headers, browser_address=workflow_run_model.browser_address, + job_id=workflow_run_model.job_id, script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run) if workflow_run_model.script_run else None, diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 750bd5b9..7d52681a 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -79,6 +79,7 @@ class Workflow(BaseModel): generate_script: bool = False ai_fallback: bool = False cache_key: str | None = None + run_sequentially: bool | None = None created_at: datetime modified_at: datetime @@ -136,6 +137,7 @@ class WorkflowRun(BaseModel): max_screenshot_scrolls: int | None = None browser_address: str | None = None script_run: ScriptRunResponse | None = None + job_id: str | None = None queued_at: datetime | None = None started_at: datetime | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index c79de2f9..769ffe8c 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -646,6 +646,7 @@ class WorkflowService: generate_script: bool = False, cache_key: str | None = None, ai_fallback: bool | None = None, + run_sequentially: bool = False, ) -> Workflow: return await app.DATABASE.create_workflow( title=title, @@ -667,6 +668,7 @@ class WorkflowService: generate_script=generate_script, cache_key=cache_key, ai_fallback=False if ai_fallback is None else ai_fallback, + run_sequentially=run_sequentially, ) async def create_workflow_from_prompt( @@ -1742,6 +1744,7 @@ class WorkflowService: generate_script=request.generate_script, cache_key=request.cache_key, ai_fallback=request.ai_fallback, + run_sequentially=request.run_sequentially, ) else: workflow = await self.create_workflow( @@ -1762,6 +1765,7 @@ class WorkflowService: generate_script=request.generate_script, cache_key=request.cache_key, ai_fallback=request.ai_fallback, + run_sequentially=request.run_sequentially, ) # Keeping track of the new workflow id to delete it if an error occurs during the creation process new_workflow_id = workflow.workflow_id diff --git a/skyvern/schemas/workflows.py b/skyvern/schemas/workflows.py index cab3535d..776001a6 100644 --- a/skyvern/schemas/workflows.py +++ b/skyvern/schemas/workflows.py @@ -517,6 +517,7 @@ class WorkflowCreateYAMLRequest(BaseModel): generate_script: bool = False ai_fallback: bool = False cache_key: str | None = None + run_sequentially: bool = False class WorkflowRequest(BaseModel):