add flag for forcing code_gen for v2 task block runs (#3576)

This commit is contained in:
Jonathan Dobson
2025-10-01 13:52:42 -04:00
committed by GitHub
parent e6e36f98de
commit 4d2ee0c665
7 changed files with 47 additions and 3 deletions

View File

@@ -48,6 +48,7 @@ class AsyncExecutor(abc.ABC):
browser_session_id: str | None,
block_labels: list[str] | None,
block_outputs: dict[str, Any] | None,
code_gen: bool | None = None,
**kwargs: dict,
) -> None:
pass
@@ -153,6 +154,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
browser_session_id: str | None,
block_labels: list[str] | None,
block_outputs: dict[str, Any] | None,
code_gen: bool | None = None,
**kwargs: dict,
) -> None:
if background_tasks:
@@ -173,6 +175,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
browser_session_id=browser_session_id,
block_labels=block_labels,
block_outputs=block_outputs,
code_gen=code_gen,
)
else:
LOG.warning("Background tasks not enabled, skipping workflow execution")

View File

@@ -1008,6 +1008,7 @@ async def run_block(
user_id=user_id,
browser_session_id=browser_session_id,
block_outputs=block_run_request.block_outputs,
code_gen=block_run_request.code_gen,
)
return BlockRunResponse(

View File

@@ -238,6 +238,7 @@ class Block(BaseModel, abc.ABC):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
pass
@@ -295,6 +296,7 @@ class Block(BaseModel, abc.ABC):
parent_workflow_run_block_id: str | None = None,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_block_id = None
@@ -344,6 +346,7 @@ class Block(BaseModel, abc.ABC):
workflow_run_block_id,
organization_id=organization_id,
browser_session_id=browser_session_id,
code_gen=code_gen,
**kwargs,
)
except Exception as e:
@@ -500,6 +503,7 @@ class BaseTaskBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
@@ -1307,6 +1311,7 @@ class ForLoopBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
@@ -1484,6 +1489,7 @@ async def wrapper():
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
await app.AGENT_FUNCTION.validate_code_block(organization_id=organization_id)
@@ -1650,6 +1656,7 @@ class TextPromptBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
# Validate block execution
@@ -1734,6 +1741,7 @@ class DownloadToS3Block(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
# get workflow run context
@@ -1822,6 +1830,7 @@ class UploadToS3Block(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
# get workflow run context
@@ -1988,6 +1997,7 @@ class FileUploadBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
# get workflow run context
@@ -2404,6 +2414,7 @@ class SendEmailBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
@@ -2639,6 +2650,7 @@ class FileParserBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
@@ -2778,6 +2790,7 @@ class PDFParserBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
@@ -2878,6 +2891,7 @@ class WaitBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
# TODO: we need to support to interrupt the sleep when the workflow run failed/cancelled/terminated
@@ -2922,6 +2936,7 @@ class ValidationBlock(BaseTaskBlock):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
task_order, _ = await self.get_task_order(workflow_run_id, 0)
@@ -3024,6 +3039,7 @@ class TaskV2Block(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus # noqa: PLC0415
@@ -3095,6 +3111,7 @@ class TaskV2Block(Block):
request_id=None,
max_steps_override=self.max_steps,
browser_session_id=browser_session_id,
code_gen=code_gen,
)
finally:
context: skyvern_context.SkyvernContext | None = skyvern_context.current()
@@ -3212,6 +3229,7 @@ class HttpRequestBlock(Block):
workflow_run_block_id: str,
organization_id: str | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
**kwargs: dict,
) -> BlockResult:
"""Execute the HTTP request and return the response"""

View File

@@ -273,6 +273,7 @@ class WorkflowService:
block_labels: list[str] | None = None,
block_outputs: dict[str, Any] | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
) -> WorkflowRun:
"""Execute a workflow."""
organization_id = organization.organization_id
@@ -400,6 +401,7 @@ class WorkflowService:
workflow=workflow,
workflow_run=workflow_run,
block_labels=block_labels,
code_gen=code_gen,
)
else:
LOG.info(
@@ -2580,9 +2582,18 @@ class WorkflowService:
workflow: Workflow,
workflow_run: WorkflowRun,
block_labels: list[str] | None = None,
code_gen: bool | None = None,
) -> None:
if block_labels:
# Do not generate script if block_labels is provided
LOG.info(
"Generate script?",
block_labels=block_labels,
code_gen=code_gen,
workflow_run_id=workflow_run.workflow_run_id,
)
if block_labels and not code_gen:
# Do not generate script if block_labels is provided, and an explicit code_gen
# request is not made
return None
existing_script, rendered_cache_key_value = await workflow_script_service.get_workflow_script(

View File

@@ -378,6 +378,10 @@ class BlockRunRequest(WorkflowRunRequest):
# org_id/user_id, or an override supplied by the user
description="Any active outputs of blocks in a workflow being debugged",
)
code_gen: bool | None = Field(
default=False,
description="Whether to generate colde for blocks that support it",
)
debug_session_id: str | None = Field(
default=None,
description="ID of the debug session to use for this block run",

View File

@@ -62,6 +62,7 @@ async def execute_blocks(
user_id: str,
browser_session_id: str | None = None,
block_outputs: dict[str, t.Any] | None = None,
code_gen: bool | None = None,
) -> None:
"""
Runs one or more blocks of a workflow.
@@ -114,4 +115,5 @@ async def execute_blocks(
api_key=api_key,
block_labels=block_labels,
block_outputs=block_outputs,
code_gen=code_gen,
)

View File

@@ -306,6 +306,7 @@ async def run_task_v2(
request_id: str | None = None,
max_steps_override: str | int | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
) -> TaskV2:
organization_id = organization.organization_id
try:
@@ -334,6 +335,7 @@ async def run_task_v2(
request_id=request_id,
max_steps_override=max_steps_override,
browser_session_id=browser_session_id,
code_gen=code_gen,
)
except TaskTerminationError as e:
task_v2 = await mark_task_v2_as_terminated(
@@ -390,6 +392,7 @@ async def run_task_v2_helper(
request_id: str | None = None,
max_steps_override: str | int | None = None,
browser_session_id: str | None = None,
code_gen: bool | None = None,
) -> tuple[Workflow, WorkflowRun, TaskV2] | tuple[None, None, TaskV2]:
organization_id = organization.organization_id
task_v2_id = task_v2.observer_cruise_id
@@ -772,6 +775,7 @@ async def run_task_v2_helper(
block_result = await block.execute_safe(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
code_gen=code_gen,
)
task_history_record["status"] = str(block_result.status)
if block_result.failure_reason:
@@ -900,10 +904,11 @@ async def run_task_v2_helper(
context=context,
screenshots=completion_screenshots,
)
if task_v2.run_with == "code": # TODO(jdo): not sure about this one...
if task_v2.run_with == "code" or code_gen: # TODO(jdo): not sure about this one...
await app.WORKFLOW_SERVICE.generate_script_if_needed(
workflow=workflow,
workflow_run=workflow_run,
code_gen=code_gen,
)
break