block level script run (#3710)

This commit is contained in:
Shuchang Zheng
2025-10-14 16:17:03 -07:00
committed by GitHub
parent e13e9a5d58
commit ce35b37470
10 changed files with 352 additions and 176 deletions

View File

@@ -4152,6 +4152,7 @@ class AgentDB:
organization_id: str,
script_block_label: str,
script_file_id: str | None = None,
run_signature: str | None = None,
) -> ScriptBlock:
"""Create a script block."""
async with self.Session() as session:
@@ -4161,6 +4162,7 @@ class AgentDB:
organization_id=organization_id,
script_block_label=script_block_label,
script_file_id=script_file_id,
run_signature=run_signature,
)
session.add(script_block)
await session.commit()
@@ -4172,6 +4174,7 @@ class AgentDB:
script_block_id: str,
organization_id: str,
script_file_id: str | None = None,
run_signature: str | None = None,
) -> ScriptBlock:
async with self.Session() as session:
script_block = (
@@ -4182,8 +4185,10 @@ class AgentDB:
)
).first()
if script_block:
if script_file_id:
if script_file_id is not None:
script_block.script_file_id = script_file_id
if run_signature is not None:
script_block.run_signature = run_signature
await session.commit()
await session.refresh(script_block)
return convert_to_script_block(script_block)

View File

@@ -969,6 +969,7 @@ class ScriptBlockModel(Base):
script_revision_id = Column(String, nullable=False, index=True)
script_block_label = Column(String, nullable=False)
script_file_id = Column(String, nullable=True)
run_signature = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)

View File

@@ -580,6 +580,7 @@ def convert_to_script_block(script_block_model: ScriptBlockModel) -> ScriptBlock
script_revision_id=script_block_model.script_revision_id,
script_block_label=script_block_model.script_block_label,
script_file_id=script_block_model.script_file_id,
run_signature=script_block_model.run_signature,
created_at=script_block_model.created_at,
modified_at=script_block_model.modified_at,
deleted_at=script_block_model.deleted_at,

View File

@@ -630,6 +630,16 @@ class BaseTaskBlock(Block):
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run, url=self.url, browser_session_id=browser_session_id
)
working_page = await browser_state.get_working_page()
if not working_page:
LOG.error(
"BrowserState has no page",
workflow_run_id=workflow_run.workflow_run_id,
)
raise MissingBrowserStatePage(workflow_run_id=workflow_run.workflow_run_id)
if working_page.url == "about:blank" and self.url:
await browser_state.navigate_to_url(page=working_page, url=self.url)
except Exception as e:
LOG.exception(
"Failed to get browser state for first task",

View File

@@ -1,5 +1,8 @@
import asyncio
import importlib.util
import json
import os
import textwrap
import uuid
from collections import deque
from datetime import UTC, datetime
@@ -8,6 +11,7 @@ from typing import Any, Literal, cast
import httpx
import structlog
import skyvern
from skyvern import analytics
from skyvern.client.types.output_parameter import OutputParameter as BlockOutputParameter
from skyvern.config import settings
@@ -19,7 +23,6 @@ from skyvern.exceptions import (
FailedToSendWebhook,
InvalidCredentialId,
MissingValueForParameter,
ScriptTerminationException,
SkyvernException,
WorkflowNotFound,
WorkflowRunNotFound,
@@ -98,9 +101,10 @@ from skyvern.forge.sdk.workflow.models.workflow import (
WorkflowRunStatus,
)
from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse
from skyvern.schemas.scripts import ScriptStatus
from skyvern.schemas.scripts import ScriptStatus, WorkflowScript
from skyvern.schemas.workflows import (
BLOCK_YAML_TYPES,
BlockResult,
BlockStatus,
BlockType,
ForLoopBlockYAML,
@@ -433,29 +437,18 @@ class WorkflowService:
if workflow_run.code_gen:
current_context.generate_script = True
is_script_run = self.should_run_script(workflow, workflow_run)
if workflow_script and is_script_run:
LOG.info(
"Running script for workflow run",
workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id,
organization_id=organization_id,
workflow_script_id=workflow_script.script_id,
)
workflow_run = await self._execute_workflow_script(
script_id=workflow_script.script_id,
workflow_run=workflow_run,
organization=organization,
browser_session_id=browser_session_id,
)
else:
workflow_run = await self._execute_workflow_blocks(
workflow=workflow,
workflow_run=workflow_run,
organization=organization,
browser_session_id=browser_session_id,
block_labels=block_labels,
block_outputs=block_outputs,
)
# Unified execution: execute blocks one by one, using script code when available
if is_script_run is False:
workflow_script = None
workflow_run = await self._execute_workflow_blocks(
workflow=workflow,
workflow_run=workflow_run,
organization=organization,
browser_session_id=browser_session_id,
block_labels=block_labels,
block_outputs=block_outputs,
workflow_script=workflow_script,
)
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run_id,
@@ -500,12 +493,87 @@ class WorkflowService:
browser_session_id: str | None = None,
block_labels: list[str] | None = None,
block_outputs: dict[str, Any] | None = None,
workflow_script: WorkflowScript | None = None,
) -> WorkflowRun:
organization_id = organization.organization_id
workflow_run_id = workflow_run.workflow_run_id
top_level_blocks = workflow.workflow_definition.blocks
all_blocks = get_all_blocks(top_level_blocks)
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with="agent")
# Load script blocks if workflow_script is provided
script_blocks_by_label: dict[str, Any] = {}
loaded_script_module = None
if workflow_script:
LOG.info(
"Loading script blocks for workflow execution",
workflow_run_id=workflow_run_id,
script_id=workflow_script.script_id,
)
try:
# Load script blocks from database
script = await app.DATABASE.get_script(
script_id=workflow_script.script_id,
organization_id=organization_id,
)
if script:
script_files = await app.DATABASE.get_script_files(
script_revision_id=script.script_revision_id,
organization_id=organization_id,
)
await script_service.load_scripts(script, script_files)
script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id(
script_revision_id=script.script_revision_id,
organization_id=organization_id,
)
# Create mapping from block label to script block
for script_block in script_blocks:
if script_block.run_signature:
script_blocks_by_label[script_block.script_block_label] = script_block
script_path = os.path.join(settings.TEMP_PATH, workflow_script.script_id, "main.py")
if os.path.exists(script_path):
# setup script run
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(
workflow_run_id=workflow_run.workflow_run_id
)
script_parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
spec = importlib.util.spec_from_file_location("user_script", script_path)
if spec and spec.loader:
loaded_script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(loaded_script_module)
await skyvern.setup(
script_parameters,
generated_parameter_cls=loaded_script_module.GeneratedWorkflowParameters,
)
LOG.info(
"Successfully loaded script module",
script_id=workflow_script.script_id,
block_count=len(script_blocks_by_label),
)
else:
LOG.warning(
"Script file not found at path",
script_path=script_path,
script_id=workflow_script.script_id,
)
except Exception as e:
LOG.warning(
"Failed to load script blocks, will fallback to normal execution",
error=str(e),
exc_info=True,
workflow_run_id=workflow_run_id,
script_id=workflow_script.script_id,
)
script_blocks_by_label = {}
loaded_script_module = None
# Mark workflow as running with appropriate engine
run_with = "code" if script_blocks_by_label else "agent"
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with=run_with)
if block_labels and len(block_labels):
blocks: list[BlockTypeVar] = []
@@ -573,11 +641,106 @@ class WorkflowService:
block_label=block.label,
model=block.model,
)
block_result = await block.execute_safe(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
browser_session_id=browser_session_id,
)
# Try executing with script code if available
block_executed_with_code = False
valid_to_run_code = block.label and block.label in script_blocks_by_label
if valid_to_run_code:
script_block = script_blocks_by_label[block.label]
LOG.info(
"Attempting to execute block with script code",
block_label=block.label,
run_signature=script_block.run_signature,
)
try:
# Execute the run signature and capture the return value
vars_dict = vars(loaded_script_module) if loaded_script_module else {}
exec_globals = {
**vars_dict,
"skyvern": skyvern,
"__builtins__": __builtins__,
}
# Use exec to handle multi-line run_signature statements
# Create an async function and execute it
# Dedent first to normalize indentation, then re-indent for function body
assert script_block.run_signature is not None
normalized_signature = textwrap.dedent(script_block.run_signature).strip()
# Add 8 spaces (2 levels: function + return statement)
indented_signature = textwrap.indent(normalized_signature, " ")
# Build the wrapper function
wrapper_code = (
f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n"
)
LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code)
exec_code = compile(wrapper_code, "<run_signature>", "exec")
exec(exec_code, exec_globals)
output_value = await exec_globals["__run_signature_wrapper"]()
# Execution succeeded - get the block result from the workflow run blocks
# The script execution should have created the workflow run block
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# Find the most recent block with matching label
matching_blocks = [b for b in workflow_run_blocks if b.label == block.label]
if matching_blocks:
latest_block = max(matching_blocks, key=lambda b: b.created_at)
# Construct BlockResult from the workflow_run_block
block_result = BlockResult(
success=latest_block.status == BlockStatus.completed,
failure_reason=latest_block.failure_reason,
output_parameter=block.output_parameter,
output_parameter_value=latest_block.output,
status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed,
workflow_run_block_id=latest_block.workflow_run_block_id,
)
block_executed_with_code = True
LOG.info(
"Successfully executed block with script code",
block_label=block.label,
block_status=block_result.status,
has_output=output_value is not None,
)
else:
LOG.warning(
"Block executed with code but no workflow run block found",
block_label=block.label,
)
# Fallback to AI execution
block_executed_with_code = False
except Exception as e:
LOG.warning(
"Failed to execute block with script code, falling back to AI",
block_label=block.label,
error=str(e),
exc_info=True,
)
block_executed_with_code = False
# Execute with AI if code execution was not attempted or failed
if not block_executed_with_code:
LOG.info(
"Executing block with AI",
block_label=block.label,
block_type=block.block_type,
)
block_result = await block.execute_safe(
workflow_run_id=workflow_run_id,
organization_id=organization_id,
browser_session_id=browser_session_id,
)
if not block_result:
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id, failure_reason="Block result is None"
)
break
if block_result.status == BlockStatus.canceled:
LOG.info(
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
@@ -2779,79 +2942,6 @@ class WorkflowService:
return result
async def _execute_workflow_script(
self,
script_id: str,
workflow_run: WorkflowRun,
organization: Organization,
browser_session_id: str | None = None,
) -> WorkflowRun:
"""
Execute the related workflow script instead of running the workflow blocks.
"""
LOG.info("Start to execute workflow script", workflow_run_id=workflow_run.workflow_run_id)
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id, run_with="code")
try:
# Render the cache_key_value to find the right script
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(
workflow_run_id=workflow_run.workflow_run_id
)
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
# Execute the script using script_service
try:
await script_service.execute_script(
script_id=script_id,
organization_id=organization.organization_id,
parameters=parameters,
workflow_run_id=workflow_run.workflow_run_id,
browser_session_id=browser_session_id,
background_tasks=None, # Execute synchronously
)
except ScriptTerminationException as e:
LOG.info(
"Script terminated, marking workflow run as terminated",
failure_reason=e.message,
workflow_run_id=workflow_run.workflow_run_id,
)
workflow_run = await self.mark_workflow_run_as_terminated(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=e.message,
)
return workflow_run
# Mark workflow run as completed
workflow_run = await self.mark_workflow_run_as_completed(
workflow_run_id=workflow_run.workflow_run_id,
)
LOG.info(
"Successfully executed workflow script",
workflow_run_id=workflow_run.workflow_run_id,
script_id=script_id,
organization_id=organization.organization_id,
)
return workflow_run
except Exception as e:
LOG.error(
"Failed to execute workflow script, marking workflow run as failed",
workflow_run_id=workflow_run.workflow_run_id,
error=str(e),
exc_info=True,
)
# Mark workflow run as failed
failure_reason = f"Failed to execute workflow script: {str(e)}"
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id,
failure_reason=failure_reason,
)
return workflow_run
async def generate_script_if_needed(
self,
workflow: Workflow,
@@ -2906,6 +2996,8 @@ class WorkflowService:
) -> bool:
if workflow_run.run_with == "code":
return True
if workflow_run.run_with == "agent":
return False
if workflow.run_with == "code":
return True
return False