From 0c3b5488cc824a917646c345043bd5ac57c085f8 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Wed, 8 Oct 2025 15:57:01 +0800 Subject: [PATCH] fix sequential run issue (#3643) --- ...1201bc8d_add_depends_on_workflow_run_id.py | 31 ++++++++++ skyvern/forge/sdk/api/aws.py | 60 +++++++++++++++++++ skyvern/forge/sdk/db/client.py | 9 ++- skyvern/forge/sdk/db/models.py | 1 + skyvern/forge/sdk/db/utils.py | 1 + skyvern/forge/sdk/workflow/models/workflow.py | 1 + 6 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/2025_10_08_0753-81351201bc8d_add_depends_on_workflow_run_id.py diff --git a/alembic/versions/2025_10_08_0753-81351201bc8d_add_depends_on_workflow_run_id.py b/alembic/versions/2025_10_08_0753-81351201bc8d_add_depends_on_workflow_run_id.py new file mode 100644 index 00000000..21a83aa2 --- /dev/null +++ b/alembic/versions/2025_10_08_0753-81351201bc8d_add_depends_on_workflow_run_id.py @@ -0,0 +1,31 @@ +"""add_depends_on_workflow_run_id + +Revision ID: 81351201bc8d +Revises: 1ab477ef80e4 +Create Date: 2025-10-08 07:53:33.714723+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "81351201bc8d" +down_revision: Union[str, None] = "1ab477ef80e4" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_runs", sa.Column("depends_on_workflow_run_id", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflow_runs", "depends_on_workflow_run_id") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/api/aws.py b/skyvern/forge/sdk/api/aws.py index ced8ed54..46ed58dc 100644 --- a/skyvern/forge/sdk/api/aws.py +++ b/skyvern/forge/sdk/api/aws.py @@ -4,6 +4,7 @@ from urllib.parse import urlparse import aioboto3 import structlog +from types_boto3_batch.client import BatchClient from types_boto3_ec2.client import EC2Client from types_boto3_ecs.client import ECSClient from types_boto3_s3.client import S3Client @@ -31,6 +32,7 @@ class AWSClientType(StrEnum): SECRETS_MANAGER = "secretsmanager" ECS = "ecs" EC2 = "ec2" + BATCH = "batch" class AsyncAWSClient: @@ -64,6 +66,9 @@ class AsyncAWSClient: def _ec2_client(self) -> EC2Client: return self.session.client(AWSClientType.EC2, region_name=self.region_name, endpoint_url=self._endpoint_url) + def _batch_client(self) -> BatchClient: + return self.session.client(AWSClientType.BATCH, region_name=self.region_name, endpoint_url=self._endpoint_url) + def _create_tag_string(self, tags: dict[str, str]) -> str: return "&".join([f"{k}={v}" for k, v in tags.items()]) @@ -394,6 +399,61 @@ class AsyncAWSClient: async with self._ec2_client() as client: return await client.describe_network_interfaces(NetworkInterfaceIds=network_interface_ids) + ###### Batch ###### + async def describe_job(self, job_id: str) -> dict: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch/client/describe_jobs.html + async with self._batch_client() as client: + response = await client.describe_jobs(jobs=[job_id]) + return response["jobs"][0] if response["jobs"] else {} + + async def list_jobs(self, job_queue: str, job_status: str) -> list[dict]: + # NOTE: AWS batch only records the latest 7 days jobs by default + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch/client/list_jobs.html + async with self._batch_client() as client: + total_jobs = [] + async for page in client.get_paginator("list_jobs").paginate(jobQueue=job_queue, jobStatus=job_status): + for job in page["jobSummaryList"]: + total_jobs.append(job) + + return total_jobs + + async def submit_job( + self, + job_name: str, + job_queue: str, + job_definition: str, + params: dict, + job_priority: int | None = None, + share_identifier: str | None = None, + container_overrides: dict | None = None, + depends_on_ids: list[str] | None = None, + ) -> str | None: + container_overrides = container_overrides or {} + depends_on = [{"jobId": job_id} for job_id in depends_on_ids or []] + async with self._batch_client() as client: + if job_priority is None or share_identifier is None: + response = await client.submit_job( + jobName=job_name, + jobQueue=job_queue, + jobDefinition=job_definition, + parameters=params, + containerOverrides=container_overrides, + dependsOn=depends_on, + ) + return response.get("jobId") + else: + response = await client.submit_job( + jobName=job_name, + jobQueue=job_queue, + jobDefinition=job_definition, + parameters=params, + schedulingPriorityOverride=job_priority, + shareIdentifier=share_identifier, + containerOverrides=container_overrides, + dependsOn=depends_on, + ) + return response.get("jobId") + class S3Uri: # From: https://stackoverflow.com/questions/42641315/s3-urls-get-bucket-name-and-path diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index f905d247..e69c8e2f 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1749,6 +1749,7 @@ class AgentDB: run_with: str | None = None, sequential_key: str | None = None, ai_fallback: bool | None = None, + depends_on_workflow_run_id: str | None = None, ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( @@ -1777,6 +1778,8 @@ class AgentDB: workflow_run.sequential_key = sequential_key if ai_fallback is not None: workflow_run.ai_fallback = ai_fallback + if depends_on_workflow_run_id: + workflow_run.depends_on_workflow_run_id = depends_on_workflow_run_id await session.commit() await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) @@ -1834,12 +1837,16 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise - async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun | None: + async def get_workflow_run( + self, workflow_run_id: str, organization_id: str | None = None, job_id: str | None = None + ) -> WorkflowRun | None: try: async with self.Session() as session: get_workflow_run_query = select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id) if organization_id: get_workflow_run_query = get_workflow_run_query.filter_by(organization_id=organization_id) + if job_id: + get_workflow_run_query = get_workflow_run_query.filter_by(job_id=job_id) if workflow_run := (await session.scalars(get_workflow_run_query)).first(): return convert_to_workflow_run(workflow_run) return None diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 85c3051a..bd158f75 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -289,6 +289,7 @@ class WorkflowRunModel(Base): browser_address = Column(String, nullable=True) script_run = Column(JSON, nullable=True) job_id = Column(String, nullable=True) + depends_on_workflow_run_id = Column(String, nullable=True) sequential_key = Column(String, nullable=True) run_with = Column(String, nullable=True) # 'agent' or 'code' debug_session_id: Column = Column(String, nullable=True) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 2039df5f..df852472 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -324,6 +324,7 @@ def convert_to_workflow_run( extra_http_headers=workflow_run_model.extra_http_headers, browser_address=workflow_run_model.browser_address, job_id=workflow_run_model.job_id, + depends_on_workflow_run_id=workflow_run_model.depends_on_workflow_run_id, sequential_key=workflow_run_model.sequential_key, script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run) if workflow_run_model.script_run diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index f773b101..5671b22b 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -143,6 +143,7 @@ class WorkflowRun(BaseModel): run_with: str | None = None script_run: ScriptRunResponse | None = None job_id: str | None = None + depends_on_workflow_run_id: str | None = None sequential_key: str | None = None ai_fallback: bool | None = None code_gen: bool | None = None