diff --git a/alembic/versions/2025_10_14_2232-b80c42316c94_add_run_signature_to_script_block.py b/alembic/versions/2025_10_14_2232-b80c42316c94_add_run_signature_to_script_block.py new file mode 100644 index 00000000..0cfda6e2 --- /dev/null +++ b/alembic/versions/2025_10_14_2232-b80c42316c94_add_run_signature_to_script_block.py @@ -0,0 +1,31 @@ +"""add run_signature to script block + +Revision ID: b80c42316c94 +Revises: 774e10939484 +Create Date: 2025-10-14 22:32:57.466000+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b80c42316c94" +down_revision: Union[str, None] = "774e10939484" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("script_blocks", sa.Column("run_signature", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("script_blocks", "run_signature") + # ### end Alembic commands ### diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 902a60e8..298b8b95 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -586,38 +586,7 @@ def _build_download_statement( block_title: str, block: dict[str, Any], data_variable_name: str | None = None ) -> cst.SimpleStatementLine: """Build a skyvern.download statement.""" - args = [ - cst.Arg( - keyword=cst.Name("prompt"), - value=_value(block.get("navigation_goal") or ""), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ), - ] - if block.get("download_suffix"): - args.append( - cst.Arg( - keyword=cst.Name("download_suffix"), - value=_value(block.get("download_suffix")), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - last_line=cst.SimpleWhitespace(INDENT), - ), - ) - ) - args.append( - cst.Arg( - keyword=cst.Name("cache_key"), - value=_value(block_title), - whitespace_after_arg=cst.ParenthesizedWhitespace( - indent=True, - ), - comma=cst.Comma(), - ) - ) - + args = __build_base_task_statement(block_title, block, data_variable_name) call = cst.Call( func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("download")), args=args, @@ -1498,6 +1467,17 @@ def __build_base_task_statement( ), ) ) + if block.get("download_suffix"): + args.append( + cst.Arg( + keyword=cst.Name("download_suffix"), + value=_value(block.get("download_suffix")), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) if block.get("totp_identifier"): args.append( cst.Arg( @@ -1746,6 +1726,12 @@ async def generate_workflow_script_python_code( block_name = task.get("label") or task.get("title") or task.get("task_id") or f"task_{idx}" temp_module = cst.Module(body=[block_fn_def]) block_code = temp_module.code + + # Extract the run signature (the statement that calls skyvern.action/extract/etc) + block_stmt = _build_block_statement(task) + run_signature_module = cst.Module(body=[block_stmt]) + run_signature = run_signature_module.code.strip() + await create_or_update_script_block( block_code=block_code, script_revision_id=script_revision_id, @@ -1753,6 +1739,7 @@ async def generate_workflow_script_python_code( organization_id=organization_id, block_label=block_name, update=pending, + run_signature=run_signature, ) except Exception as e: LOG.error("Failed to create script block", error=str(e), exc_info=True) @@ -1796,6 +1783,11 @@ async def generate_workflow_script_python_code( block_name = task_v2.get("label") or task_v2.get("title") or f"task_v2_{idx}" + # Extract the run signature for task_v2 block + task_v2_stmt = _build_block_statement(task_v2) + run_signature_module = cst.Module(body=[task_v2_stmt]) + run_signature = run_signature_module.code.strip() + await create_or_update_script_block( block_code=task_v2_block_code, script_revision_id=script_revision_id, @@ -1803,6 +1795,7 @@ async def generate_workflow_script_python_code( organization_id=organization_id, block_label=block_name, update=pending, + run_signature=run_signature, ) except Exception as e: LOG.error("Failed to create task_v2 script block", error=str(e), exc_info=True) @@ -1891,6 +1884,7 @@ async def create_or_update_script_block( organization_id: str, block_label: str, update: bool = False, + run_signature: str | None = None, ) -> None: """ Create a script block in the database and save the block code to a script file. @@ -1903,6 +1897,7 @@ async def create_or_update_script_block( organization_id: The organization ID block_label: Optional custom name for the block (defaults to function name) update: Whether to update the script block instead of creating a new one + run_signature: The function call code to execute this block (e.g., "await skyvern.action(...)") """ block_code_bytes = block_code if isinstance(block_code, bytes) else block_code.encode("utf-8") try: @@ -1918,6 +1913,14 @@ async def create_or_update_script_block( script_id=script_id, organization_id=organization_id, script_block_label=block_label, + run_signature=run_signature, + ) + elif run_signature: + # Update the run_signature if provided + script_block = await app.DATABASE.update_script_block( + script_block_id=script_block.script_block_id, + organization_id=organization_id, + run_signature=run_signature, ) # Step 4: Create script file for the block diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index 957bc159..81f0139e 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -651,7 +651,9 @@ class SkyvernPage: intention=intention, response=value, ) - await handle_input_text_action(action, self.page, self.scraped_page, task, step) + result = await handle_input_text_action(action, self.page, self.scraped_page, task, step) + if result and result[-1].success is False: + raise Exception(result[-1].exception_message) else: locator = self.page.locator(f"xpath={xpath}") await handler_utils.input_sequentially(locator, transformed_value, timeout=timeout) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 9cdff636..4ecba12d 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -4152,6 +4152,7 @@ class AgentDB: organization_id: str, script_block_label: str, script_file_id: str | None = None, + run_signature: str | None = None, ) -> ScriptBlock: """Create a script block.""" async with self.Session() as session: @@ -4161,6 +4162,7 @@ class AgentDB: organization_id=organization_id, script_block_label=script_block_label, script_file_id=script_file_id, + run_signature=run_signature, ) session.add(script_block) await session.commit() @@ -4172,6 +4174,7 @@ class AgentDB: script_block_id: str, organization_id: str, script_file_id: str | None = None, + run_signature: str | None = None, ) -> ScriptBlock: async with self.Session() as session: script_block = ( @@ -4182,8 +4185,10 @@ class AgentDB: ) ).first() if script_block: - if script_file_id: + if script_file_id is not None: script_block.script_file_id = script_file_id + if run_signature is not None: + script_block.run_signature = run_signature await session.commit() await session.refresh(script_block) return convert_to_script_block(script_block) diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index 7ab062d3..05d382f5 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -969,6 +969,7 @@ class ScriptBlockModel(Base): script_revision_id = Column(String, nullable=False, index=True) script_block_label = Column(String, nullable=False) script_file_id = Column(String, nullable=True) + run_signature = Column(String, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False) modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 0ca4b5d8..a1ef9d61 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -580,6 +580,7 @@ def convert_to_script_block(script_block_model: ScriptBlockModel) -> ScriptBlock script_revision_id=script_block_model.script_revision_id, script_block_label=script_block_model.script_block_label, script_file_id=script_block_model.script_file_id, + run_signature=script_block_model.run_signature, created_at=script_block_model.created_at, modified_at=script_block_model.modified_at, deleted_at=script_block_model.deleted_at, diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index d3853877..a17900ca 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -630,6 +630,16 @@ class BaseTaskBlock(Block): browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( workflow_run=workflow_run, url=self.url, browser_session_id=browser_session_id ) + working_page = await browser_state.get_working_page() + if not working_page: + LOG.error( + "BrowserState has no page", + workflow_run_id=workflow_run.workflow_run_id, + ) + raise MissingBrowserStatePage(workflow_run_id=workflow_run.workflow_run_id) + if working_page.url == "about:blank" and self.url: + await browser_state.navigate_to_url(page=working_page, url=self.url) + except Exception as e: LOG.exception( "Failed to get browser state for first task", diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index a0bedbe5..f62ebce0 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1,5 +1,8 @@ import asyncio +import importlib.util import json +import os +import textwrap import uuid from collections import deque from datetime import UTC, datetime @@ -8,6 +11,7 @@ from typing import Any, Literal, cast import httpx import structlog +import skyvern from skyvern import analytics from skyvern.client.types.output_parameter import OutputParameter as BlockOutputParameter from skyvern.config import settings @@ -19,7 +23,6 @@ from skyvern.exceptions import ( FailedToSendWebhook, InvalidCredentialId, MissingValueForParameter, - ScriptTerminationException, SkyvernException, WorkflowNotFound, WorkflowRunNotFound, @@ -98,9 +101,10 @@ from skyvern.forge.sdk.workflow.models.workflow import ( WorkflowRunStatus, ) from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse -from skyvern.schemas.scripts import ScriptStatus +from skyvern.schemas.scripts import ScriptStatus, WorkflowScript from skyvern.schemas.workflows import ( BLOCK_YAML_TYPES, + BlockResult, BlockStatus, BlockType, ForLoopBlockYAML, @@ -433,29 +437,18 @@ class WorkflowService: if workflow_run.code_gen: current_context.generate_script = True is_script_run = self.should_run_script(workflow, workflow_run) - if workflow_script and is_script_run: - LOG.info( - "Running script for workflow run", - workflow_run_id=workflow_run_id, - workflow_id=workflow.workflow_id, - organization_id=organization_id, - workflow_script_id=workflow_script.script_id, - ) - workflow_run = await self._execute_workflow_script( - script_id=workflow_script.script_id, - workflow_run=workflow_run, - organization=organization, - browser_session_id=browser_session_id, - ) - else: - workflow_run = await self._execute_workflow_blocks( - workflow=workflow, - workflow_run=workflow_run, - organization=organization, - browser_session_id=browser_session_id, - block_labels=block_labels, - block_outputs=block_outputs, - ) + # Unified execution: execute blocks one by one, using script code when available + if is_script_run is False: + workflow_script = None + workflow_run = await self._execute_workflow_blocks( + workflow=workflow, + workflow_run=workflow_run, + organization=organization, + browser_session_id=browser_session_id, + block_labels=block_labels, + block_outputs=block_outputs, + workflow_script=workflow_script, + ) if refreshed_workflow_run := await app.DATABASE.get_workflow_run( workflow_run_id=workflow_run_id, @@ -500,12 +493,87 @@ class WorkflowService: browser_session_id: str | None = None, block_labels: list[str] | None = None, block_outputs: dict[str, Any] | None = None, + workflow_script: WorkflowScript | None = None, ) -> WorkflowRun: organization_id = organization.organization_id workflow_run_id = workflow_run.workflow_run_id top_level_blocks = workflow.workflow_definition.blocks all_blocks = get_all_blocks(top_level_blocks) - await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with="agent") + + # Load script blocks if workflow_script is provided + script_blocks_by_label: dict[str, Any] = {} + loaded_script_module = None + + if workflow_script: + LOG.info( + "Loading script blocks for workflow execution", + workflow_run_id=workflow_run_id, + script_id=workflow_script.script_id, + ) + try: + # Load script blocks from database + script = await app.DATABASE.get_script( + script_id=workflow_script.script_id, + organization_id=organization_id, + ) + if script: + script_files = await app.DATABASE.get_script_files( + script_revision_id=script.script_revision_id, + organization_id=organization_id, + ) + await script_service.load_scripts(script, script_files) + + script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id( + script_revision_id=script.script_revision_id, + organization_id=organization_id, + ) + + # Create mapping from block label to script block + for script_block in script_blocks: + if script_block.run_signature: + script_blocks_by_label[script_block.script_block_label] = script_block + + script_path = os.path.join(settings.TEMP_PATH, workflow_script.script_id, "main.py") + if os.path.exists(script_path): + # setup script run + parameter_tuples = await app.DATABASE.get_workflow_run_parameters( + workflow_run_id=workflow_run.workflow_run_id + ) + script_parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} + + spec = importlib.util.spec_from_file_location("user_script", script_path) + if spec and spec.loader: + loaded_script_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(loaded_script_module) + await skyvern.setup( + script_parameters, + generated_parameter_cls=loaded_script_module.GeneratedWorkflowParameters, + ) + LOG.info( + "Successfully loaded script module", + script_id=workflow_script.script_id, + block_count=len(script_blocks_by_label), + ) + else: + LOG.warning( + "Script file not found at path", + script_path=script_path, + script_id=workflow_script.script_id, + ) + except Exception as e: + LOG.warning( + "Failed to load script blocks, will fallback to normal execution", + error=str(e), + exc_info=True, + workflow_run_id=workflow_run_id, + script_id=workflow_script.script_id, + ) + script_blocks_by_label = {} + loaded_script_module = None + + # Mark workflow as running with appropriate engine + run_with = "code" if script_blocks_by_label else "agent" + await self.mark_workflow_run_as_running(workflow_run_id=workflow_run_id, run_with=run_with) if block_labels and len(block_labels): blocks: list[BlockTypeVar] = [] @@ -573,11 +641,106 @@ class WorkflowService: block_label=block.label, model=block.model, ) - block_result = await block.execute_safe( - workflow_run_id=workflow_run_id, - organization_id=organization_id, - browser_session_id=browser_session_id, - ) + + # Try executing with script code if available + block_executed_with_code = False + valid_to_run_code = block.label and block.label in script_blocks_by_label + if valid_to_run_code: + script_block = script_blocks_by_label[block.label] + LOG.info( + "Attempting to execute block with script code", + block_label=block.label, + run_signature=script_block.run_signature, + ) + try: + # Execute the run signature and capture the return value + vars_dict = vars(loaded_script_module) if loaded_script_module else {} + exec_globals = { + **vars_dict, + "skyvern": skyvern, + "__builtins__": __builtins__, + } + + # Use exec to handle multi-line run_signature statements + # Create an async function and execute it + + # Dedent first to normalize indentation, then re-indent for function body + assert script_block.run_signature is not None + normalized_signature = textwrap.dedent(script_block.run_signature).strip() + # Add 8 spaces (2 levels: function + return statement) + indented_signature = textwrap.indent(normalized_signature, " ") + + # Build the wrapper function + wrapper_code = ( + f"async def __run_signature_wrapper():\n return (\n{indented_signature}\n )\n" + ) + + LOG.debug("Executing run_signature wrapper", wrapper_code=wrapper_code) + + exec_code = compile(wrapper_code, "", "exec") + exec(exec_code, exec_globals) + output_value = await exec_globals["__run_signature_wrapper"]() + + # Execution succeeded - get the block result from the workflow run blocks + # The script execution should have created the workflow run block + workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + ) + # Find the most recent block with matching label + matching_blocks = [b for b in workflow_run_blocks if b.label == block.label] + if matching_blocks: + latest_block = max(matching_blocks, key=lambda b: b.created_at) + + # Construct BlockResult from the workflow_run_block + block_result = BlockResult( + success=latest_block.status == BlockStatus.completed, + failure_reason=latest_block.failure_reason, + output_parameter=block.output_parameter, + output_parameter_value=latest_block.output, + status=BlockStatus(latest_block.status) if latest_block.status else BlockStatus.failed, + workflow_run_block_id=latest_block.workflow_run_block_id, + ) + block_executed_with_code = True + LOG.info( + "Successfully executed block with script code", + block_label=block.label, + block_status=block_result.status, + has_output=output_value is not None, + ) + else: + LOG.warning( + "Block executed with code but no workflow run block found", + block_label=block.label, + ) + # Fallback to AI execution + block_executed_with_code = False + except Exception as e: + LOG.warning( + "Failed to execute block with script code, falling back to AI", + block_label=block.label, + error=str(e), + exc_info=True, + ) + block_executed_with_code = False + + # Execute with AI if code execution was not attempted or failed + if not block_executed_with_code: + LOG.info( + "Executing block with AI", + block_label=block.label, + block_type=block.block_type, + ) + block_result = await block.execute_safe( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + browser_session_id=browser_session_id, + ) + if not block_result: + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason="Block result is None" + ) + break if block_result.status == BlockStatus.canceled: LOG.info( f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt - 1} was canceled for workflow run {workflow_run_id}, cancelling workflow run", @@ -2779,79 +2942,6 @@ class WorkflowService: return result - async def _execute_workflow_script( - self, - script_id: str, - workflow_run: WorkflowRun, - organization: Organization, - browser_session_id: str | None = None, - ) -> WorkflowRun: - """ - Execute the related workflow script instead of running the workflow blocks. - """ - LOG.info("Start to execute workflow script", workflow_run_id=workflow_run.workflow_run_id) - await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id, run_with="code") - - try: - # Render the cache_key_value to find the right script - parameter_tuples = await app.DATABASE.get_workflow_run_parameters( - workflow_run_id=workflow_run.workflow_run_id - ) - parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} - - # Execute the script using script_service - try: - await script_service.execute_script( - script_id=script_id, - organization_id=organization.organization_id, - parameters=parameters, - workflow_run_id=workflow_run.workflow_run_id, - browser_session_id=browser_session_id, - background_tasks=None, # Execute synchronously - ) - except ScriptTerminationException as e: - LOG.info( - "Script terminated, marking workflow run as terminated", - failure_reason=e.message, - workflow_run_id=workflow_run.workflow_run_id, - ) - workflow_run = await self.mark_workflow_run_as_terminated( - workflow_run_id=workflow_run.workflow_run_id, - failure_reason=e.message, - ) - return workflow_run - - # Mark workflow run as completed - workflow_run = await self.mark_workflow_run_as_completed( - workflow_run_id=workflow_run.workflow_run_id, - ) - - LOG.info( - "Successfully executed workflow script", - workflow_run_id=workflow_run.workflow_run_id, - script_id=script_id, - organization_id=organization.organization_id, - ) - - return workflow_run - - except Exception as e: - LOG.error( - "Failed to execute workflow script, marking workflow run as failed", - workflow_run_id=workflow_run.workflow_run_id, - error=str(e), - exc_info=True, - ) - - # Mark workflow run as failed - failure_reason = f"Failed to execute workflow script: {str(e)}" - workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, - failure_reason=failure_reason, - ) - - return workflow_run - async def generate_script_if_needed( self, workflow: Workflow, @@ -2906,6 +2996,8 @@ class WorkflowService: ) -> bool: if workflow_run.run_with == "code": return True + if workflow_run.run_with == "agent": + return False if workflow.run_with == "code": return True return False diff --git a/skyvern/schemas/scripts.py b/skyvern/schemas/scripts.py index a04e77e5..1c62e18d 100644 --- a/skyvern/schemas/scripts.py +++ b/skyvern/schemas/scripts.py @@ -134,6 +134,7 @@ class ScriptBlock(BaseModel): script_revision_id: str script_block_label: str script_file_id: str | None = None + run_signature: str | None = None # The function call code to execute this block created_at: datetime modified_at: datetime deleted_at: datetime | None = None diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 262d1617..1970aaf2 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -47,7 +47,15 @@ from skyvern.forge.sdk.workflow.models.block import ( from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, OutputParameter, ParameterType from skyvern.forge.sdk.workflow.models.workflow import Workflow from skyvern.schemas.runs import RunEngine -from skyvern.schemas.scripts import CreateScriptResponse, FileEncoding, FileNode, ScriptFileCreate, ScriptStatus +from skyvern.schemas.scripts import ( + CreateScriptResponse, + FileEncoding, + FileNode, + Script, + ScriptFile, + ScriptFileCreate, + ScriptStatus, +) from skyvern.schemas.workflows import BlockStatus, BlockType, FileStorageType, FileType LOG = structlog.get_logger() @@ -258,35 +266,18 @@ async def create_script( raise HTTPException(status_code=500, detail="Failed to create script") -async def execute_script( - script_id: str, - organization_id: str, - parameters: dict[str, Any] | None = None, - workflow_run_id: str | None = None, - browser_session_id: str | None = None, - background_tasks: BackgroundTasks | None = None, +async def load_scripts( + script: Script, + script_files: list[ScriptFile], ) -> None: - # step 1: get the script revision - script = await app.DATABASE.get_script( - script_id=script_id, - organization_id=organization_id, - ) - if not script: - raise ScriptNotFound(script_id=script_id) - - # step 2: get the script files - script_files = await app.DATABASE.get_script_files( - script_revision_id=script.script_revision_id, organization_id=organization_id - ) - - # step 3: copy the script files to the local directory + organization_id = script.organization_id for file in script_files: # retrieve the artifact if not file.artifact_id: continue artifact = await app.DATABASE.get_artifact_by_id(file.artifact_id, organization_id) if not artifact: - LOG.error("Artifact not found", artifact_id=file.artifact_id, script_id=script_id) + LOG.error("Artifact not found", artifact_id=file.artifact_id, script_id=script.script_id) continue file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact) if not file_content: @@ -313,6 +304,31 @@ async def execute_script( with open(file_path, "wb") as f: f.write(file_content) + +async def execute_script( + script_id: str, + organization_id: str, + parameters: dict[str, Any] | None = None, + workflow_run_id: str | None = None, + browser_session_id: str | None = None, + background_tasks: BackgroundTasks | None = None, +) -> None: + # step 1: get the script revision + script = await app.DATABASE.get_script( + script_id=script_id, + organization_id=organization_id, + ) + if not script: + raise ScriptNotFound(script_id=script_id) + + # step 2: get the script files + script_files = await app.DATABASE.get_script_files( + script_revision_id=script.script_revision_id, organization_id=organization_id + ) + + # step 3: copy the script files to the local directory + await load_scripts(script, script_files) + # step 4: execute the script if workflow_run_id and not parameters: parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) @@ -320,6 +336,7 @@ async def execute_script( LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) script_path = os.path.join(settings.TEMP_PATH, script.script_id, "main.py") + if background_tasks: # Execute asynchronously in background background_tasks.add_task( @@ -329,6 +346,7 @@ async def execute_script( organization_id=organization_id, workflow_run_id=workflow_run_id, browser_session_id=browser_session_id, + script_id=script_id, ) else: # Execute synchronously @@ -681,17 +699,7 @@ async def _fallback_to_ai_run( organization_id=organization_id, status=StepStatus.failed, ) - # 2. create a new step for ai run - ai_step = await app.DATABASE.create_step( - task_id=task_id, - organization_id=organization_id, - order=previous_step.order + 1, - retry_index=0, - ) - context.step_id = ai_step.step_id - ai_step_id = ai_step.step_id - # 3. build the task block - # 4. run execute_step + # 2. run execute_step organization = await app.DATABASE.get_organization(organization_id=organization_id) if not organization: raise Exception(f"Organization is missing organization_id={organization_id}") @@ -717,8 +725,29 @@ async def _fallback_to_ai_run( workflow_permanent_id=workflow_permanent_id, workflow_run_id=workflow_run_id, ) + if workflow_run_block_id: + await _update_workflow_block( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(error), + step_id=script_step_id, + step_status=StepStatus.failed, + label=cache_key, + ) return + # 2. create a new step for ai run + ai_step = await app.DATABASE.create_step( + task_id=task_id, + organization_id=organization_id, + order=previous_step.order + 1, + retry_index=0, + ) + context.step_id = ai_step.step_id + ai_step_id = ai_step.step_id + # get the output_paramter output_parameter = workflow.get_output_parameter(cache_key) if not output_parameter: @@ -777,11 +806,13 @@ async def _fallback_to_ai_run( # Update block status to completed if workflow block was created if workflow_run_block_id: + # refresh the task + refreshed_task = await app.DATABASE.get_task(task_id=task_id, organization_id=organization_id) + if refreshed_task: + task = refreshed_task await _update_workflow_block( workflow_run_block_id, - BlockStatus.completed, - task_id=context.task_id, - step_id=context.step_id, + BlockStatus(task.status.value), label=cache_key, ) @@ -1581,7 +1612,6 @@ async def run_script( user_script = importlib.util.module_from_spec(spec) spec.loader.exec_module(user_script) - # Call run_workflow from the imported module if hasattr(user_script, "run_workflow"): # If parameters is None, pass an empty dict if parameters: