import asyncio import base64 import hashlib import importlib.util import os from datetime import datetime from typing import Any import structlog from fastapi import BackgroundTasks, HTTPException from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager from skyvern.exceptions import ScriptNotFound from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate LOG = structlog.get_logger(__name__) async def build_file_tree( files: list[ScriptFileCreate], organization_id: str, script_id: str, script_version: int, script_revision_id: str, ) -> dict[str, FileNode]: """Build a hierarchical file tree from a list of files and upload the files to s3 with the same tree structure.""" file_tree: dict[str, FileNode] = {} for file in files: # Decode content to calculate size and hash content_bytes = base64.b64decode(file.content) content_hash = hashlib.sha256(content_bytes).hexdigest() file_size = len(content_bytes) # Create artifact and upload to S3 try: artifact_id = await app.ARTIFACT_MANAGER.create_script_file_artifact( organization_id=organization_id, script_id=script_id, script_version=script_version, file_path=file.path, data=content_bytes, ) LOG.debug( "Created script file artifact", artifact_id=artifact_id, file_path=file.path, script_id=script_id, script_version=script_version, ) # create a script file record await app.DATABASE.create_script_file( script_revision_id=script_revision_id, script_id=script_id, organization_id=organization_id, file_path=file.path, file_name=file.path.split("/")[-1], file_type="file", content_hash=f"sha256:{content_hash}", file_size=file_size, mime_type=file.mime_type, artifact_id=artifact_id, ) except Exception: LOG.exception( "Failed to create script file artifact", file_path=file.path, script_id=script_id, script_version=script_version, script_revision_id=script_revision_id, ) raise # Split path into components path_parts = file.path.split("/") current_level = file_tree # Create directory structure for _, part in enumerate(path_parts[:-1]): if part not in current_level: current_level[part] = FileNode(type="directory", created_at=datetime.utcnow(), children={}) elif current_level[part].type == "file": # Convert file to directory if needed current_level[part] = FileNode(type="directory", created_at=current_level[part].created_at, children={}) current_level = current_level[part].children or {} # Add the file filename = path_parts[-1] current_level[filename] = FileNode( type="file", size=file_size, mime_type=file.mime_type, content_hash=f"sha256:{content_hash}", created_at=datetime.utcnow(), ) return file_tree async def create_script( organization_id: str, workflow_id: str | None = None, run_id: str | None = None, files: list[ScriptFileCreate] | None = None, ) -> CreateScriptResponse: LOG.info( "Creating script", organization_id=organization_id, file_count=len(files) if files else 0, ) try: if run_id and not await app.DATABASE.get_run(run_id=run_id, organization_id=organization_id): raise HTTPException(status_code=404, detail=f"Run_id {run_id} not found") script = await app.DATABASE.create_script( organization_id=organization_id, run_id=run_id, ) file_tree: dict[str, FileNode] = {} file_count = 0 if files: file_tree = await build_file_tree( files, organization_id=organization_id, script_id=script.script_id, script_version=script.version, script_revision_id=script.script_revision_id, ) file_count = len(files) return CreateScriptResponse( script_id=script.script_id, version=script.version, run_id=script.run_id, file_count=file_count, created_at=script.created_at, file_tree=file_tree, ) except Exception as e: LOG.error("Failed to create script", error=str(e), exc_info=True) raise HTTPException(status_code=500, detail="Failed to create script") async def execute_script( script_id: str, organization_id: str, parameters: dict[str, Any] | None = None, workflow_run_id: str | None = None, background_tasks: BackgroundTasks | None = None, ) -> None: # TODO: assume the script only has one ScriptFile called main.py # step 1: get the script revision # step 2: get the script files # step 3: copy the script files to the local directory # step 4: execute the script # step 1: get the script revision script = await app.DATABASE.get_script( script_id=script_id, organization_id=organization_id, ) if not script: raise ScriptNotFound(script_id=script_id) # step 2: get the script files script_files = await app.DATABASE.get_script_files( script_revision_id=script.script_revision_id, organization_id=organization_id ) # step 3: copy the script files to the local directory for file in script_files: # retrieve the artifact if not file.artifact_id: continue artifact = await app.DATABASE.get_artifact_by_id(file.artifact_id, organization_id) if not artifact: LOG.error("Artifact not found", artifact_id=file.artifact_id, script_id=script_id) continue file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact) if not file_content: continue file_path = os.path.join(script.script_id, file.file_path) # create the directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) # Determine the encoding to use encoding = "utf-8" try: # Try to decode as text if file.mime_type and file.mime_type.startswith("text/"): # Text file - decode as string with open(file_path, "w", encoding=encoding) as f: f.write(file_content.decode(encoding)) else: # Binary file - write as bytes with open(file_path, "wb") as f: f.write(file_content) except UnicodeDecodeError: # Fallback to binary mode if text decoding fails with open(file_path, "wb") as f: f.write(file_content) # step 4: execute the script if background_tasks: if workflow_run_id and not parameters: parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id) parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) background_tasks.add_task(run_script, parameters=parameters) LOG.info("Script executed successfully", script_id=script_id) async def _run_cached_function(cache_key: str) -> None: cached_fn = script_run_context_manager.get_cached_fn(cache_key) if cached_fn: # TODO: handle exceptions here and fall back to AI run in case of error run_context = script_run_context_manager.ensure_run_context() await cached_fn(page=run_context.page, context=run_context) else: raise Exception(f"Cache key {cache_key} not found") async def run_task( prompt: str, url: str | None = None, max_steps: int | None = None, cache_key: str | None = None, ) -> None: if cache_key: await _run_cached_function(cache_key) else: raise Exception("Cache key is required to run task block in a script") async def download( prompt: str, url: str | None = None, max_steps: int | None = None, cache_key: str | None = None, ) -> None: if cache_key: await _run_cached_function(cache_key) else: raise Exception("Cache key is required to run task block in a script") async def action( prompt: str, url: str | None = None, max_steps: int | None = None, cache_key: str | None = None, ) -> None: if cache_key: await _run_cached_function(cache_key) else: raise Exception("Cache key is required to run task block in a script") async def login( prompt: str, url: str | None = None, max_steps: int | None = None, cache_key: str | None = None, ) -> None: if cache_key: await _run_cached_function(cache_key) else: raise Exception("Cache key is required to run task block in a script") async def extract( prompt: str, url: str | None = None, max_steps: int | None = None, cache_key: str | None = None, ) -> None: if cache_key: await _run_cached_function(cache_key) else: raise Exception("Cache key is required to run task block in a script") async def wait(seconds: int) -> None: await asyncio.sleep(seconds) async def run_script(path: str, parameters: dict[str, Any] | None = None) -> None: # register the script run run_id = "123" skyvern_context.set(skyvern_context.SkyvernContext(run_id=run_id)) # run the script as subprocess; pass the parameters and run_id to the script # Dynamically import the script at the given path spec = importlib.util.spec_from_file_location("user_script", path) if not spec or not spec.loader: raise Exception(f"Failed to import script from {path}") user_script = importlib.util.module_from_spec(spec) spec.loader.exec_module(user_script) # Call run_workflow from the imported module if hasattr(user_script, "run_workflow"): # If parameters is None, pass an empty dict if parameters: await user_script.run_workflow(parameters=parameters) else: await user_script.run_workflow() else: raise Exception(f"No 'run_workflow' function found in {path}")