From ae4fbe638ec261d8cb7e688fb077bee742983497 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Tue, 19 Aug 2025 13:32:39 -0700 Subject: [PATCH] run script with workflow block run, task, step and artifacts built (#3234) --- .../script_generations/generate_script.py | 5 +- .../core/script_generations/skyvern_page.py | 89 ++++- skyvern/forge/agent.py | 6 + skyvern/forge/sdk/core/skyvern_context.py | 3 +- skyvern/forge/sdk/db/client.py | 7 +- skyvern/forge/sdk/workflow/service.py | 123 +++++- skyvern/services/script_service.py | 354 +++++++++++++++++- 7 files changed, 562 insertions(+), 25 deletions(-) diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 3235192d..564a4305 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -777,7 +777,10 @@ def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine: def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> FunctionDef: body = [ - cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())"), + cst.parse_statement( + "parameters = parameters.model_dump() if isinstance(parameters, WorkflowParameters) else parameters" + ), + cst.parse_statement("page, context = await skyvern.setup(parameters)"), ] for block in blocks: diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index 7a179538..1f1af0e2 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -13,9 +13,11 @@ from skyvern.config import settings from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import download_file +from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.webeye.actions import handler_utils from skyvern.webeye.actions.action_types import ActionType +from skyvern.webeye.actions.actions import Action from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website @@ -85,9 +87,8 @@ class SkyvernPage: """ Decorator to record the action call. - TODOs: - - generate action record in db pre action - - generate screenshot post action + Auto-creates action records in DB before action execution + and screenshot artifacts after action execution. """ def decorator(fn: Callable) -> Callable: @@ -100,18 +101,33 @@ class SkyvernPage: ) -> Any: meta = ActionMetadata(intention, data) call = ActionCall(action, args, kwargs, meta) + + # Auto-create action before execution + action_record = await skyvern_page._create_action_before_execution( + action_type=action, intention=intention, data=data + ) + try: call.result = await fn( skyvern_page, *args, intention=intention, data=data, **kwargs ) # real driver call + + # Note: Action status would be updated to completed here if update method existed + return call.result except Exception as e: call.error = e + + # Note: Action status would be updated to failed here if update method existed + # LLM fallback hook could go here ... raise finally: skyvern_page._record(call) + # Auto-create screenshot artifact after execution + await skyvern_page._create_screenshot_after_execution(action_record) + return wrapper return decorator @@ -119,6 +135,73 @@ class SkyvernPage: async def goto(self, url: str) -> None: await self.page.goto(url) + async def _create_action_before_execution( + self, action_type: ActionType, intention: str = "", data: str | dict[str, Any] = "" + ) -> Action | None: + """Create an action record in the database before execution if task_id and step_id are available.""" + try: + context = skyvern_context.current() + if not context or not context.task_id or not context.step_id: + return None + + # Create action record + action = Action( + action_type=action_type, + organization_id=context.organization_id, + workflow_run_id=context.workflow_run_id, + task_id=context.task_id, + step_id=context.step_id, + step_order=0, # Will be updated by the system if needed + action_order=0, # Will be updated by the system if needed + intention=intention, + reasoning=f"Auto-generated action for {action_type.value}", + ) + + created_action = await app.DATABASE.create_action(action) + return created_action + + except Exception: + # If action creation fails, don't block the actual action execution + return None + + async def _create_screenshot_after_execution(self, action_record: Action | None = None) -> None: + """Create a screenshot artifact after action execution if task_id and step_id are available.""" + try: + context = skyvern_context.ensure_context() + if not context or not context.task_id or not context.step_id: + return + + # Get browser state and take screenshot + browser_state = await app.BROWSER_MANAGER.get_for_script() + if not browser_state: + return + + screenshot = await browser_state.take_fullpage_screenshot( + use_playwright_fullpage=app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached( + "ENABLE_PLAYWRIGHT_FULLPAGE", + context.workflow_run_id or context.task_id, + properties={"organization_id": context.organization_id}, + ) + ) + + if screenshot: + # Create a minimal Step object for artifact creation + step = await app.DATABASE.get_step( + context.task_id, context.step_id, organization_id=context.organization_id + ) + if not step: + return + + await app.ARTIFACT_MANAGER.create_artifact( + step=step, + artifact_type=ArtifactType.SCREENSHOT_ACTION, + data=screenshot, + ) + + except Exception: + # If screenshot creation fails, don't block execution + pass + ######### Public Interfaces ######### @action_wrap(ActionType.CLICK) async def click(self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index c77f0ce1..e0f90074 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -872,6 +872,12 @@ class ForgeAgent: step_order=step.order, step_retry=step.retry_index, ) + + # Update context with step_id for auto action/screenshot creation + context = skyvern_context.current() + if context: + context.step_id = step.step_id + step = await self.update_step(step=step, status=StepStatus.running) await app.AGENT_FUNCTION.prepare_step_execution( organization=organization, task=task, step=step, browser_state=browser_state diff --git a/skyvern/forge/sdk/core/skyvern_context.py b/skyvern/forge/sdk/core/skyvern_context.py index a8ee313d..6d2b75e6 100644 --- a/skyvern/forge/sdk/core/skyvern_context.py +++ b/skyvern/forge/sdk/core/skyvern_context.py @@ -11,6 +11,7 @@ class SkyvernContext: organization_id: str | None = None organization_name: str | None = None task_id: str | None = None + step_id: str | None = None workflow_id: str | None = None workflow_permanent_id: str | None = None workflow_run_id: str | None = None @@ -30,7 +31,7 @@ class SkyvernContext: script_revision_id: str | None = None def __repr__(self) -> str: - return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})" + return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, step_id={self.step_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})" def __str__(self) -> str: return self.__repr__() diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 14f61ea1..6faa6f1a 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -139,11 +139,12 @@ class AgentDB: self, url: str, title: str | None, - complete_criterion: str | None, - terminate_criterion: str | None, navigation_goal: str | None, data_extraction_goal: str | None, navigation_payload: dict[str, Any] | list | str | None, + status: str = "created", + complete_criterion: str | None = None, + terminate_criterion: str | None = None, webhook_callback_url: str | None = None, totp_verification_url: str | None = None, totp_identifier: str | None = None, @@ -166,7 +167,7 @@ class AgentDB: try: async with self.Session() as session: new_task = TaskModel( - status="created", + status=status, task_type=task_type, url=url, title=title, diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index b1291469..0cc139e4 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -94,7 +94,7 @@ from skyvern.forge.sdk.workflow.models.workflow import ( WorkflowRunStatus, ) from skyvern.schemas.runs import ProxyLocation, RunStatus, RunType, WorkflowRunRequest, WorkflowRunResponse -from skyvern.schemas.scripts import FileEncoding, ScriptFileCreate +from skyvern.schemas.scripts import FileEncoding, Script, ScriptFileCreate from skyvern.schemas.workflows import ( BLOCK_YAML_TYPES, BlockStatus, @@ -277,6 +277,24 @@ class WorkflowService: workflow_run = await self.get_workflow_run(workflow_run_id=workflow_run_id, organization_id=organization_id) workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id) + # Check if there's a related workflow script that should be used instead + workflow_script = await self._get_workflow_script(workflow, workflow_run) + if workflow_script is not None: + LOG.info( + "Found related workflow script, running script instead of workflow", + workflow_run_id=workflow_run_id, + workflow_id=workflow.workflow_id, + organization_id=organization_id, + workflow_script_id=workflow_script.script_id, + ) + return await self._execute_workflow_script( + script_id=workflow_script.script_id, + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + organization=organization, + ) + # Set workflow run status to running, create workflow run parameters workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) @@ -2269,6 +2287,109 @@ class WorkflowService: return result + async def _get_workflow_script(self, workflow: Workflow, workflow_run: WorkflowRun) -> Script | None: + """ + Check if there's a related workflow script that should be used instead of running the workflow. + Returns True if a script should be used, False otherwise. + """ + if not workflow.generate_script: + return None + # Only check for scripts if the workflow has a cache_key + cache_key = workflow.cache_key or "" + + try: + # Render the cache_key_value from workflow run parameters (same logic as generate_script_for_workflow) + parameter_tuples = await app.DATABASE.get_workflow_run_parameters( + workflow_run_id=workflow_run.workflow_run_id + ) + parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} + + jinja_sandbox_env = SandboxedEnvironment() + rendered_cache_key_value = jinja_sandbox_env.from_string(cache_key).render(parameters) + + # Check if there are existing cached scripts for this workflow + cache_key_value + existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value( + organization_id=workflow.organization_id, + workflow_permanent_id=workflow.workflow_permanent_id, + cache_key_value=rendered_cache_key_value, + ) + + if existing_scripts: + LOG.info( + "Found cached script for workflow", + workflow_id=workflow.workflow_id, + cache_key_value=rendered_cache_key_value, + workflow_run_id=workflow_run.workflow_run_id, + script_count=len(existing_scripts), + ) + return existing_scripts[0] + + return None + + except Exception as e: + LOG.warning( + "Failed to check for workflow script, proceeding with normal workflow execution", + workflow_id=workflow.workflow_id, + workflow_run_id=workflow_run.workflow_run_id, + error=str(e), + exc_info=True, + ) + return None + + async def _execute_workflow_script( + self, script_id: str, workflow: Workflow, workflow_run: WorkflowRun, api_key: str, organization: Organization + ) -> WorkflowRun: + """ + Execute the related workflow script instead of running the workflow blocks. + """ + + try: + # Set workflow run status to running + workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) + + # Render the cache_key_value to find the right script + parameter_tuples = await app.DATABASE.get_workflow_run_parameters( + workflow_run_id=workflow_run.workflow_run_id + ) + parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} + + # Execute the script using script_service + await script_service.execute_script( + script_id=script_id, + organization_id=organization.organization_id, + parameters=parameters, + workflow_run_id=workflow_run.workflow_run_id, + background_tasks=None, # Execute synchronously + ) + + # Mark workflow run as completed + workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) + + LOG.info( + "Successfully executed workflow script", + workflow_run_id=workflow_run.workflow_run_id, + script_id=script_id, + organization_id=organization.organization_id, + ) + + return workflow_run + + except Exception as e: + LOG.error( + "Failed to execute workflow script, marking workflow run as failed", + workflow_run_id=workflow_run.workflow_run_id, + error=str(e), + exc_info=True, + ) + + # Mark workflow run as failed + failure_reason = f"Failed to execute workflow script: {str(e)}" + workflow_run = await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) + + return workflow_run + async def generate_script_for_workflow( self, workflow: Workflow, diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 5d8f0cd3..06192bb5 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -9,10 +9,13 @@ from typing import Any import structlog from fastapi import BackgroundTasks, HTTPException +from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager -from skyvern.exceptions import ScriptNotFound +from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.schemas.tasks import TaskStatus +from skyvern.forge.sdk.workflow.models.block import BlockStatus, BlockType from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate LOG = structlog.get_logger(__name__) @@ -158,6 +161,7 @@ async def execute_script( # step 2: get the script files # step 3: copy the script files to the local directory # step 4: execute the script + # step 5: TODO: close all the browser instances # step 1: get the script revision script = await app.DATABASE.get_script( @@ -207,15 +211,144 @@ async def execute_script( f.write(file_content) # step 4: execute the script + if workflow_run_id and not parameters: + parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) + parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} + LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) + if background_tasks: - if workflow_run_id and not parameters: - parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) - parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} - LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) - background_tasks.add_task(run_script, parameters=parameters) + # Execute asynchronously in background + background_tasks.add_task( + run_script, parameters=parameters, organization_id=organization_id, workflow_run_id=workflow_run_id + ) + else: + # Execute synchronously + script_path = os.path.join(script.script_id, "main.py") + if os.path.exists(script_path): + await run_script( + script_path, parameters=parameters, organization_id=organization_id, workflow_run_id=workflow_run_id + ) + else: + LOG.error("Script main.py not found", script_path=script_path, script_id=script_id) + raise Exception(f"Script main.py not found at {script_path}") + LOG.info("Script executed successfully", script_id=script_id) +async def _create_workflow_block_run_and_task( + block_type: BlockType, + prompt: str | None = None, + url: str | None = None, +) -> tuple[str | None, str | None]: + """ + Create a workflow block run and optionally a task if workflow_run_id is available in context. + Returns (workflow_run_block_id, task_id) tuple. + """ + context = skyvern_context.ensure_context() + workflow_run_id = context.workflow_run_id + organization_id = context.organization_id + if not context or not workflow_run_id or not organization_id: + return None, None + + try: + # Create workflow run block with appropriate parameters based on block type + # TODO: support engine in the future + engine = None + workflow_run_block = await app.DATABASE.create_workflow_run_block( + workflow_run_id=workflow_run_id, + organization_id=organization_id, + block_type=block_type, + engine=engine, + ) + + workflow_run_block_id = workflow_run_block.workflow_run_block_id + task_id = None + step_id = None + + # Create task for task-based blocks + if block_type in SCRIPT_TASK_BLOCKS: + # Create task + task = await app.DATABASE.create_task( + # fix HACK: changed the type of url to str | None to support None url. url is not used in the script right now. + url=url or "", + title=f"Script {block_type.value} task", + navigation_goal=prompt, + data_extraction_goal=prompt if block_type == BlockType.EXTRACTION else None, + navigation_payload={}, + status="running", + organization_id=organization_id, + workflow_run_id=workflow_run_id, + ) + + task_id = task.task_id + + # create a single step for the task + step = await app.DATABASE.create_step( + task_id=task_id, + order=0, + retry_index=0, + organization_id=organization_id, + ) + step_id = step.step_id + + # Update workflow run block with task_id + await app.DATABASE.update_workflow_run_block( + workflow_run_block_id=workflow_run_block_id, + task_id=task_id, + organization_id=organization_id, + ) + + context.step_id = step_id + context.task_id = task_id + + return workflow_run_block_id, task_id + + except Exception as e: + LOG.warning( + "Failed to create workflow block run and task", + error=str(e), + block_type=block_type, + workflow_run_id=context.workflow_run_id, + exc_info=True, + ) + return None, None + + +async def _update_workflow_block_status( + workflow_run_block_id: str, + status: BlockStatus, + task_id: str | None = None, + task_status: TaskStatus = TaskStatus.completed, + failure_reason: str | None = None, +) -> None: + """Update the status of a workflow run block.""" + try: + context = skyvern_context.current() + if not context or not context.organization_id: + return + await app.DATABASE.update_workflow_run_block( + workflow_run_block_id=workflow_run_block_id, + organization_id=context.organization_id if context else None, + status=status, + failure_reason=failure_reason, + ) + if task_id: + await app.DATABASE.update_task( + task_id=task_id, + organization_id=context.organization_id, + status=task_status, + failure_reason=failure_reason, + ) + except Exception as e: + LOG.warning( + "Failed to update workflow block status", + workflow_run_block_id=workflow_run_block_id, + status=status, + error=str(e), + exc_info=True, + ) + + async def _run_cached_function(cache_key: str) -> None: cached_fn = script_run_context_manager.get_cached_fn(cache_key) if cached_fn: @@ -232,9 +365,41 @@ async def run_task( max_steps: int | None = None, cache_key: str | None = None, ) -> None: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + block_type=BlockType.TASK, + prompt=prompt, + url=url, + ) + if cache_key: - await _run_cached_function(cache_key) + try: + await _run_cached_function(cache_key) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(e), + ) + raise else: + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason="Cache key is required", + ) raise Exception("Cache key is required to run task block in a script") @@ -244,9 +409,41 @@ async def download( max_steps: int | None = None, cache_key: str | None = None, ) -> None: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + block_type=BlockType.FILE_DOWNLOAD, + prompt=prompt, + url=url, + ) + if cache_key: - await _run_cached_function(cache_key) + try: + await _run_cached_function(cache_key) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(e), + ) + raise else: + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason="Cache key is required", + ) raise Exception("Cache key is required to run task block in a script") @@ -256,9 +453,41 @@ async def action( max_steps: int | None = None, cache_key: str | None = None, ) -> None: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + block_type=BlockType.ACTION, + prompt=prompt, + url=url, + ) + if cache_key: - await _run_cached_function(cache_key) + try: + await _run_cached_function(cache_key) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(e), + ) + raise else: + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason="Cache key is required", + ) raise Exception("Cache key is required to run task block in a script") @@ -268,9 +497,41 @@ async def login( max_steps: int | None = None, cache_key: str | None = None, ) -> None: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + block_type=BlockType.LOGIN, + prompt=prompt, + url=url, + ) + if cache_key: - await _run_cached_function(cache_key) + try: + await _run_cached_function(cache_key) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(e), + ) + raise else: + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason="Cache key is required", + ) raise Exception("Cache key is required to run task block in a script") @@ -280,22 +541,83 @@ async def extract( max_steps: int | None = None, cache_key: str | None = None, ) -> None: + # Auto-create workflow block run and task if workflow_run_id is available + workflow_run_block_id, task_id = await _create_workflow_block_run_and_task( + block_type=BlockType.EXTRACTION, + prompt=prompt, + url=url, + ) + if cache_key: - await _run_cached_function(cache_key) + try: + await _run_cached_function(cache_key) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed, task_id=task_id) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason=str(e), + ) + raise else: + if workflow_run_block_id: + await _update_workflow_block_status( + workflow_run_block_id, + BlockStatus.failed, + task_id=task_id, + task_status=TaskStatus.failed, + failure_reason="Cache key is required", + ) raise Exception("Cache key is required to run task block in a script") async def wait(seconds: int) -> None: - await asyncio.sleep(seconds) + # Auto-create workflow block run if workflow_run_id is available (wait block doesn't create tasks) + workflow_run_block_id, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT) + + try: + await asyncio.sleep(seconds) + + # Update block status to completed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.completed) + + except Exception as e: + # Update block status to failed if workflow block was created + if workflow_run_block_id: + await _update_workflow_block_status(workflow_run_block_id, BlockStatus.failed, failure_reason=str(e)) + raise -async def run_script(path: str, parameters: dict[str, Any] | None = None) -> None: +async def run_script( + path: str, + parameters: dict[str, Any] | None = None, + organization_id: str | None = None, + workflow_run_id: str | None = None, +) -> None: # register the script run - run_id = "123" - skyvern_context.set(skyvern_context.SkyvernContext(run_id=run_id)) - # run the script as subprocess; pass the parameters and run_id to the script + context = skyvern_context.current() + if not context: + context = skyvern_context.ensure_context() + skyvern_context.set(skyvern_context.SkyvernContext()) + if workflow_run_id and organization_id: + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, organization_id=organization_id + ) + if not workflow_run: + raise WorkflowRunNotFound(workflow_run_id=workflow_run_id) + context.workflow_run_id = workflow_run_id + context.organization_id = organization_id + # run the script as subprocess; pass the parameters and run_id to the script # Dynamically import the script at the given path spec = importlib.util.spec_from_file_location("user_script", path) if not spec or not spec.loader: