workflow run sequential key (#3509)

This commit is contained in:
LawyZheng
2025-09-24 11:50:24 +08:00
committed by GitHub
parent dee4458685
commit 1b3df07e53
7 changed files with 60 additions and 0 deletions

View File

@@ -1399,6 +1399,7 @@ class AgentDB:
ai_fallback: bool = False,
cache_key: str | None = None,
run_sequentially: bool = False,
sequential_key: str | None = None,
) -> Workflow:
async with self.Session() as session:
workflow = WorkflowModel(
@@ -1420,6 +1421,7 @@ class AgentDB:
ai_fallback=ai_fallback,
cache_key=cache_key,
run_sequentially=run_sequentially,
sequential_key=sequential_key,
)
if workflow_permanent_id:
workflow.workflow_permanent_id = workflow_permanent_id
@@ -1693,6 +1695,7 @@ class AgentDB:
max_screenshot_scrolling_times: int | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
sequential_key: str | None = None,
) -> WorkflowRun:
try:
async with self.Session() as session:
@@ -1710,6 +1713,7 @@ class AgentDB:
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
extra_http_headers=extra_http_headers,
browser_address=browser_address,
sequential_key=sequential_key,
)
session.add(workflow_run)
await session.commit()
@@ -1728,6 +1732,7 @@ class AgentDB:
ai_fallback_triggered: bool | None = None,
job_id: str | None = None,
run_with: str | None = None,
sequential_key: str | None = None,
) -> WorkflowRun:
async with self.Session() as session:
workflow_run = (
@@ -1752,6 +1757,8 @@ class AgentDB:
workflow_run.job_id = job_id
if run_with:
workflow_run.run_with = run_with
if sequential_key:
workflow_run.sequential_key = sequential_key
await session.commit()
await session.refresh(workflow_run)
await save_workflow_run_logs(workflow_run_id)
@@ -1826,6 +1833,7 @@ class AgentDB:
self,
workflow_permanent_id: str,
organization_id: str | None = None,
sequential_key: str | None = None,
) -> WorkflowRun | None:
try:
async with self.Session() as session:
@@ -1833,6 +1841,8 @@ class AgentDB:
if organization_id:
query = query.filter_by(organization_id=organization_id)
query = query.filter_by(status=WorkflowRunStatus.queued)
if sequential_key:
query = query.filter_by(sequential_key=sequential_key)
query = query.order_by(WorkflowRunModel.modified_at.desc())
workflow_run = (await session.scalars(query)).first()
return convert_to_workflow_run(workflow_run) if workflow_run else None
@@ -1844,6 +1854,7 @@ class AgentDB:
self,
workflow_permanent_id: str,
organization_id: str | None = None,
sequential_key: str | None = None,
) -> WorkflowRun | None:
try:
async with self.Session() as session:
@@ -1851,6 +1862,8 @@ class AgentDB:
if organization_id:
query = query.filter_by(organization_id=organization_id)
query = query.filter_by(status=WorkflowRunStatus.running)
if sequential_key:
query = query.filter_by(sequential_key=sequential_key)
query = query.filter(
WorkflowRunModel.started_at.isnot(None)
) # filter out workflow runs that does not have a started_at timestamp