fix workflow_run.run_with override (#3543)

This commit is contained in:
Shuchang Zheng
2025-09-27 11:18:17 -07:00
committed by GitHub
parent 11d4518369
commit 56b4d828c1
4 changed files with 36 additions and 18 deletions

View File

@@ -1697,6 +1697,7 @@ class AgentDB:
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
sequential_key: str | None = None,
run_with: str | None = None,
) -> WorkflowRun:
try:
async with self.Session() as session:
@@ -1715,6 +1716,7 @@ class AgentDB:
extra_http_headers=extra_http_headers,
browser_address=browser_address,
sequential_key=sequential_key,
run_with=run_with,
)
session.add(workflow_run)
await session.commit()

View File

@@ -26,6 +26,7 @@ class WorkflowRequestBody(BaseModel):
max_screenshot_scrolls: int | None = None
extra_http_headers: dict[str, str] | None = None
browser_address: str | None = None
run_with: str | None = None
@field_validator("webhook_callback_url", "totp_verification_url")
@classmethod

View File

@@ -388,7 +388,7 @@ class WorkflowService:
WorkflowRunStatus.timed_out,
):
workflow_run = await self.mark_workflow_run_as_completed(
workflow_run_id=workflow_run_id, is_script_run=is_script_run
workflow_run_id=workflow_run_id,
)
await self.generate_script_if_needed(
workflow=workflow,
@@ -424,6 +424,7 @@ class WorkflowService:
workflow_run_id = workflow_run.workflow_run_id
top_level_blocks = workflow.workflow_definition.blocks
all_blocks = get_all_blocks(top_level_blocks)
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with="agent")
if block_labels and len(block_labels):
blocks: list[BlockTypeVar] = []
@@ -970,6 +971,7 @@ class WorkflowService:
extra_http_headers=workflow_request.extra_http_headers,
browser_address=workflow_request.browser_address,
sequential_key=sequential_key,
run_with=workflow_request.run_with,
)
async def _update_workflow_run_status(
@@ -977,9 +979,8 @@ class WorkflowService:
workflow_run_id: str,
status: WorkflowRunStatus,
failure_reason: str | None = None,
is_script_run: bool = False,
run_with: str | None = None,
) -> WorkflowRun:
run_with = "code" if is_script_run else "agent"
workflow_run = await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=status,
@@ -1006,7 +1007,7 @@ class WorkflowService:
)
return workflow_run
async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
async def mark_workflow_run_as_completed(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as completed",
workflow_run_id=workflow_run_id,
@@ -1015,11 +1016,14 @@ class WorkflowService:
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.completed,
is_script_run=is_script_run,
run_with=run_with,
)
async def mark_workflow_run_as_failed(
self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False
self,
workflow_run_id: str,
failure_reason: str | None,
run_with: str | None = None,
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as failed",
@@ -1031,23 +1035,27 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed,
failure_reason=failure_reason,
is_script_run=is_script_run,
run_with=run_with,
)
async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
async def mark_workflow_run_as_running(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as running",
workflow_run_id=workflow_run_id,
workflow_status="running",
run_with=run_with,
)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.running,
is_script_run=is_script_run,
run_with=run_with,
)
async def mark_workflow_run_as_terminated(
self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False
self,
workflow_run_id: str,
failure_reason: str | None,
run_with: str | None = None,
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as terminated",
@@ -1059,10 +1067,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.terminated,
failure_reason=failure_reason,
is_script_run=is_script_run,
run_with=run_with,
)
async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
async def mark_workflow_run_as_canceled(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as canceled",
workflow_run_id=workflow_run_id,
@@ -1071,11 +1079,14 @@ class WorkflowService:
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.canceled,
is_script_run=is_script_run,
run_with=run_with,
)
async def mark_workflow_run_as_timed_out(
self, workflow_run_id: str, failure_reason: str | None = None, is_script_run: bool = False
self,
workflow_run_id: str,
failure_reason: str | None = None,
run_with: str | None = None,
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as timed out",
@@ -1086,7 +1097,7 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.timed_out,
failure_reason=failure_reason,
is_script_run=is_script_run,
run_with=run_with,
)
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun:
@@ -2494,6 +2505,7 @@ class WorkflowService:
Execute the related workflow script instead of running the workflow blocks.
"""
LOG.info("Start to execute workflow script", workflow_run_id=workflow_run.workflow_run_id)
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id, run_with="code")
try:
# Render the cache_key_value to find the right script
@@ -2514,7 +2526,7 @@ class WorkflowService:
# Mark workflow run as completed
workflow_run = await self.mark_workflow_run_as_completed(
workflow_run_id=workflow_run.workflow_run_id, is_script_run=True
workflow_run_id=workflow_run.workflow_run_id,
)
LOG.info(
@@ -2537,7 +2549,8 @@ class WorkflowService:
# Mark workflow run as failed
failure_reason = f"Failed to execute workflow script: {str(e)}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script_run=True
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=failure_reason,
)
return workflow_run
@@ -2587,6 +2600,8 @@ class WorkflowService:
) -> bool:
if workflow_run.run_with == "code":
return True
if workflow_run.run_with == "agent":
return False
if workflow.generate_script:
return True
return False