fix sequential run issue (#3643)
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
"""add_depends_on_workflow_run_id
|
||||
|
||||
Revision ID: 81351201bc8d
|
||||
Revises: 1ab477ef80e4
|
||||
Create Date: 2025-10-08 07:53:33.714723+00:00
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "81351201bc8d"
|
||||
down_revision: Union[str, None] = "1ab477ef80e4"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column("workflow_runs", sa.Column("depends_on_workflow_run_id", sa.String(), nullable=True))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column("workflow_runs", "depends_on_workflow_run_id")
|
||||
# ### end Alembic commands ###
|
||||
@@ -4,6 +4,7 @@ from urllib.parse import urlparse
|
||||
|
||||
import aioboto3
|
||||
import structlog
|
||||
from types_boto3_batch.client import BatchClient
|
||||
from types_boto3_ec2.client import EC2Client
|
||||
from types_boto3_ecs.client import ECSClient
|
||||
from types_boto3_s3.client import S3Client
|
||||
@@ -31,6 +32,7 @@ class AWSClientType(StrEnum):
|
||||
SECRETS_MANAGER = "secretsmanager"
|
||||
ECS = "ecs"
|
||||
EC2 = "ec2"
|
||||
BATCH = "batch"
|
||||
|
||||
|
||||
class AsyncAWSClient:
|
||||
@@ -64,6 +66,9 @@ class AsyncAWSClient:
|
||||
def _ec2_client(self) -> EC2Client:
|
||||
return self.session.client(AWSClientType.EC2, region_name=self.region_name, endpoint_url=self._endpoint_url)
|
||||
|
||||
def _batch_client(self) -> BatchClient:
|
||||
return self.session.client(AWSClientType.BATCH, region_name=self.region_name, endpoint_url=self._endpoint_url)
|
||||
|
||||
def _create_tag_string(self, tags: dict[str, str]) -> str:
|
||||
return "&".join([f"{k}={v}" for k, v in tags.items()])
|
||||
|
||||
@@ -394,6 +399,61 @@ class AsyncAWSClient:
|
||||
async with self._ec2_client() as client:
|
||||
return await client.describe_network_interfaces(NetworkInterfaceIds=network_interface_ids)
|
||||
|
||||
###### Batch ######
|
||||
async def describe_job(self, job_id: str) -> dict:
|
||||
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch/client/describe_jobs.html
|
||||
async with self._batch_client() as client:
|
||||
response = await client.describe_jobs(jobs=[job_id])
|
||||
return response["jobs"][0] if response["jobs"] else {}
|
||||
|
||||
async def list_jobs(self, job_queue: str, job_status: str) -> list[dict]:
|
||||
# NOTE: AWS batch only records the latest 7 days jobs by default
|
||||
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch/client/list_jobs.html
|
||||
async with self._batch_client() as client:
|
||||
total_jobs = []
|
||||
async for page in client.get_paginator("list_jobs").paginate(jobQueue=job_queue, jobStatus=job_status):
|
||||
for job in page["jobSummaryList"]:
|
||||
total_jobs.append(job)
|
||||
|
||||
return total_jobs
|
||||
|
||||
async def submit_job(
|
||||
self,
|
||||
job_name: str,
|
||||
job_queue: str,
|
||||
job_definition: str,
|
||||
params: dict,
|
||||
job_priority: int | None = None,
|
||||
share_identifier: str | None = None,
|
||||
container_overrides: dict | None = None,
|
||||
depends_on_ids: list[str] | None = None,
|
||||
) -> str | None:
|
||||
container_overrides = container_overrides or {}
|
||||
depends_on = [{"jobId": job_id} for job_id in depends_on_ids or []]
|
||||
async with self._batch_client() as client:
|
||||
if job_priority is None or share_identifier is None:
|
||||
response = await client.submit_job(
|
||||
jobName=job_name,
|
||||
jobQueue=job_queue,
|
||||
jobDefinition=job_definition,
|
||||
parameters=params,
|
||||
containerOverrides=container_overrides,
|
||||
dependsOn=depends_on,
|
||||
)
|
||||
return response.get("jobId")
|
||||
else:
|
||||
response = await client.submit_job(
|
||||
jobName=job_name,
|
||||
jobQueue=job_queue,
|
||||
jobDefinition=job_definition,
|
||||
parameters=params,
|
||||
schedulingPriorityOverride=job_priority,
|
||||
shareIdentifier=share_identifier,
|
||||
containerOverrides=container_overrides,
|
||||
dependsOn=depends_on,
|
||||
)
|
||||
return response.get("jobId")
|
||||
|
||||
|
||||
class S3Uri:
|
||||
# From: https://stackoverflow.com/questions/42641315/s3-urls-get-bucket-name-and-path
|
||||
|
||||
@@ -1749,6 +1749,7 @@ class AgentDB:
|
||||
run_with: str | None = None,
|
||||
sequential_key: str | None = None,
|
||||
ai_fallback: bool | None = None,
|
||||
depends_on_workflow_run_id: str | None = None,
|
||||
) -> WorkflowRun:
|
||||
async with self.Session() as session:
|
||||
workflow_run = (
|
||||
@@ -1777,6 +1778,8 @@ class AgentDB:
|
||||
workflow_run.sequential_key = sequential_key
|
||||
if ai_fallback is not None:
|
||||
workflow_run.ai_fallback = ai_fallback
|
||||
if depends_on_workflow_run_id:
|
||||
workflow_run.depends_on_workflow_run_id = depends_on_workflow_run_id
|
||||
await session.commit()
|
||||
await session.refresh(workflow_run)
|
||||
await save_workflow_run_logs(workflow_run_id)
|
||||
@@ -1834,12 +1837,16 @@ class AgentDB:
|
||||
LOG.error("SQLAlchemyError", exc_info=True)
|
||||
raise
|
||||
|
||||
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun | None:
|
||||
async def get_workflow_run(
|
||||
self, workflow_run_id: str, organization_id: str | None = None, job_id: str | None = None
|
||||
) -> WorkflowRun | None:
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
get_workflow_run_query = select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id)
|
||||
if organization_id:
|
||||
get_workflow_run_query = get_workflow_run_query.filter_by(organization_id=organization_id)
|
||||
if job_id:
|
||||
get_workflow_run_query = get_workflow_run_query.filter_by(job_id=job_id)
|
||||
if workflow_run := (await session.scalars(get_workflow_run_query)).first():
|
||||
return convert_to_workflow_run(workflow_run)
|
||||
return None
|
||||
|
||||
@@ -289,6 +289,7 @@ class WorkflowRunModel(Base):
|
||||
browser_address = Column(String, nullable=True)
|
||||
script_run = Column(JSON, nullable=True)
|
||||
job_id = Column(String, nullable=True)
|
||||
depends_on_workflow_run_id = Column(String, nullable=True)
|
||||
sequential_key = Column(String, nullable=True)
|
||||
run_with = Column(String, nullable=True) # 'agent' or 'code'
|
||||
debug_session_id: Column = Column(String, nullable=True)
|
||||
|
||||
@@ -324,6 +324,7 @@ def convert_to_workflow_run(
|
||||
extra_http_headers=workflow_run_model.extra_http_headers,
|
||||
browser_address=workflow_run_model.browser_address,
|
||||
job_id=workflow_run_model.job_id,
|
||||
depends_on_workflow_run_id=workflow_run_model.depends_on_workflow_run_id,
|
||||
sequential_key=workflow_run_model.sequential_key,
|
||||
script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run)
|
||||
if workflow_run_model.script_run
|
||||
|
||||
@@ -143,6 +143,7 @@ class WorkflowRun(BaseModel):
|
||||
run_with: str | None = None
|
||||
script_run: ScriptRunResponse | None = None
|
||||
job_id: str | None = None
|
||||
depends_on_workflow_run_id: str | None = None
|
||||
sequential_key: str | None = None
|
||||
ai_fallback: bool | None = None
|
||||
code_gen: bool | None = None
|
||||
|
||||
Reference in New Issue
Block a user