denpendent workflow run (#3457)

This commit is contained in:
LawyZheng
2025-09-18 13:32:55 +08:00
committed by GitHub
parent 51121076ec
commit 7dd1d5241c
7 changed files with 67 additions and 0 deletions

View File

@@ -1371,6 +1371,7 @@ class AgentDB:
generate_script: bool = False,
ai_fallback: bool = False,
cache_key: str | None = None,
run_sequentially: bool = False,
) -> Workflow:
async with self.Session() as session:
workflow = WorkflowModel(
@@ -1391,6 +1392,7 @@ class AgentDB:
generate_script=generate_script,
ai_fallback=ai_fallback,
cache_key=cache_key,
run_sequentially=run_sequentially,
)
if workflow_permanent_id:
workflow.workflow_permanent_id = workflow_permanent_id
@@ -1673,6 +1675,7 @@ class AgentDB:
failure_reason: str | None = None,
webhook_failure_reason: str | None = None,
ai_fallback_triggered: bool | None = None,
job_id: str | None = None,
) -> WorkflowRun:
async with self.Session() as session:
workflow_run = (
@@ -1693,6 +1696,8 @@ class AgentDB:
workflow_run.webhook_failure_reason = webhook_failure_reason
if ai_fallback_triggered is not None:
workflow_run.script_run = {"ai_fallback_triggered": ai_fallback_triggered}
if job_id:
workflow_run.job_id = job_id
await session.commit()
await session.refresh(workflow_run)
await save_workflow_run_logs(workflow_run_id)
@@ -1763,6 +1768,24 @@ class AgentDB:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def get_last_queued_workflow_run(
self,
workflow_permanent_id: str,
organization_id: str | None = None,
) -> WorkflowRun | None:
try:
async with self.Session() as session:
query = select(WorkflowRunModel).filter_by(workflow_permanent_id=workflow_permanent_id)
if organization_id:
query = query.filter_by(organization_id=organization_id)
query = query.filter_by(status=WorkflowRunStatus.queued)
query = query.order_by(WorkflowRunModel.modified_at.desc())
workflow_run = (await session.scalars(query)).first()
return convert_to_workflow_run(workflow_run) if workflow_run else None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def get_workflow_runs(
self,
organization_id: str,