From 9a9ee012532b514b71267400960c1e62846dfa8b Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 21 Sep 2025 02:45:23 -0400 Subject: [PATCH] only run script when generate_script is true (#3491) --- .../script_generations/generate_script.py | 3 - skyvern/forge/__main__.py | 8 + skyvern/forge/sdk/db/client.py | 3 + skyvern/forge/sdk/db/utils.py | 1 + skyvern/forge/sdk/routes/scripts.py | 244 ++++++++++-------- skyvern/forge/sdk/workflow/models/workflow.py | 1 + skyvern/forge/sdk/workflow/service.py | 55 ++-- skyvern/services/script_service.py | 11 +- skyvern/services/workflow_script_service.py | 2 - 9 files changed, 186 insertions(+), 142 deletions(-) diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 91436ff4..f674540f 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -1813,9 +1813,6 @@ async def generate_workflow_script_python_code( ] module = cst.Module(body=module_body) - - with open(file_name, "w") as f: - f.write(module.code) return module.code diff --git a/skyvern/forge/__main__.py b/skyvern/forge/__main__.py index 9c04274c..372e5ba7 100644 --- a/skyvern/forge/__main__.py +++ b/skyvern/forge/__main__.py @@ -15,10 +15,18 @@ if __name__ == "__main__": load_dotenv() reload = settings.ENV == "local" + + # Configure reload settings uvicorn.run( "skyvern.forge.api_app:app", host="0.0.0.0", port=port, log_level="info", reload=reload, + reload_excludes=[ + f"{settings.TEMP_PATH}/*.py", + f"{settings.TEMP_PATH}/**/*.py", + f"{settings.TEMP_PATH}/*", + f"{settings.TEMP_PATH}/**", + ], ) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 6db6ab24..d9fae239 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1676,6 +1676,7 @@ class AgentDB: webhook_failure_reason: str | None = None, ai_fallback_triggered: bool | None = None, job_id: str | None = None, + run_with: str | None = None, ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( @@ -1698,6 +1699,8 @@ class AgentDB: workflow_run.script_run = {"ai_fallback_triggered": ai_fallback_triggered} if job_id: workflow_run.job_id = job_id + if run_with: + workflow_run.run_with = run_with await session.commit() await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index cf47dbb2..ef21520f 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -308,6 +308,7 @@ def convert_to_workflow_run( script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run) if workflow_run_model.script_run else None, + run_with=workflow_run_model.run_with, ) diff --git a/skyvern/forge/sdk/routes/scripts.py b/skyvern/forge/sdk/routes/scripts.py index 2bd46125..7eeb6cee 100644 --- a/skyvern/forge/sdk/routes/scripts.py +++ b/skyvern/forge/sdk/routes/scripts.py @@ -17,12 +17,126 @@ from skyvern.schemas.scripts import ( ScriptBlocksRequest, ScriptBlocksResponse, ScriptCacheKeyValuesResponse, + ScriptStatus, ) -from skyvern.services import script_service +from skyvern.services import script_service, workflow_script_service LOG = structlog.get_logger() +async def get_script_blocks_response( + organization_id: str, + workflow_permanent_id: str, + script_revision_id: str, +) -> ScriptBlocksResponse: + script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id( + script_revision_id=script_revision_id, + organization_id=organization_id, + ) + + if not script_blocks: + LOG.info( + "No script block found for workflow", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + script_revision_id=script_revision_id, + ) + return ScriptBlocksResponse(blocks={}) + + result: dict[str, str] = {} + + # TODO(jdo): make concurrent to speed up + for script_block in script_blocks: + script_file_id = script_block.script_file_id + + if not script_file_id: + LOG.info( + "No script file ID found for script block", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + script_revision_id=script_revision_id, + block_label=script_block.script_block_label, + ) + continue + + script_file = await app.DATABASE.get_script_file_by_id( + script_revision_id=script_revision_id, + file_id=script_file_id, + organization_id=organization_id, + ) + + if not script_file: + LOG.info( + "No script file found for script block", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + script_revision_id=script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + ) + continue + + artifact_id = script_file.artifact_id + + if not artifact_id: + LOG.info( + "No artifact ID found for script file", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + script_revision_id=script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + ) + continue + + artifact = await app.DATABASE.get_artifact_by_id( + artifact_id, + organization_id, + ) + + if not artifact: + LOG.error( + "No artifact found for script file", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + script_revision_id=script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + + data = await app.STORAGE.retrieve_artifact(artifact) + + if not data: + LOG.error( + "No data found for artifact", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + block_label=script_block.script_block_label, + script_revision_id=script_block.script_revision_id, + file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + + try: + decoded_data = data.decode("utf-8") + result[script_block.script_block_label] = decoded_data + except UnicodeDecodeError: + LOG.error( + "File content is not valid UTF-8 text", + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + block_label=script_block.script_block_label, + script_revision_id=script_block.script_revision_id, + file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + return ScriptBlocksResponse(blocks=result) + + @base_router.post( "/scripts", response_model=CreateScriptResponse, @@ -302,6 +416,27 @@ async def get_workflow_script_blocks( if not workflow: raise HTTPException(status_code=404, detail="Workflow not found") + workflow_run_id = block_script_request.workflow_run_id + if workflow_run_id: + workflow_run = await app.DATABASE.get_workflow_run( + workflow_run_id=workflow_run_id, + organization_id=current_org.organization_id, + ) + if not workflow_run: + workflow_run_id = None + else: + # find the published script if any and return that + published_script, _ = await workflow_script_service.get_workflow_script( + workflow=workflow, + workflow_run=workflow_run, + status=ScriptStatus.published, + ) + if published_script: + return await get_script_blocks_response( + script_revision_id=published_script.script_revision_id, + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + ) cache_key = block_script_request.cache_key or workflow.cache_key or "" status = block_script_request.status @@ -326,115 +461,12 @@ async def get_workflow_script_blocks( return empty first_script = scripts[0] - - script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id( + return await get_script_blocks_response( script_revision_id=first_script.script_revision_id, organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, ) - if not script_blocks: - LOG.info( - "No script block found for workflow", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - script_revision_id=first_script.script_revision_id, - ) - return empty - - result: dict[str, str] = {} - - # TODO(jdo): make concurrent to speed up - for script_block in script_blocks: - script_file_id = script_block.script_file_id - - if not script_file_id: - LOG.info( - "No script file ID found for script block", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - script_revision_id=first_script.script_revision_id, - block_label=script_block.script_block_label, - ) - continue - - script_file = await app.DATABASE.get_script_file_by_id( - script_revision_id=first_script.script_revision_id, - file_id=script_file_id, - organization_id=current_org.organization_id, - ) - - if not script_file: - LOG.info( - "No script file found for script block", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - script_revision_id=first_script.script_revision_id, - block_label=script_block.script_block_label, - script_file_id=script_file_id, - ) - continue - - artifact_id = script_file.artifact_id - - if not artifact_id: - LOG.info( - "No artifact ID found for script file", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - script_revision_id=first_script.script_revision_id, - block_label=script_block.script_block_label, - script_file_id=script_file_id, - ) - continue - - artifact = await app.DATABASE.get_artifact_by_id( - artifact_id, - current_org.organization_id, - ) - - if not artifact: - LOG.error( - "No artifact found for script file", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - script_revision_id=first_script.script_revision_id, - block_label=script_block.script_block_label, - script_file_id=script_file_id, - artifact_id=artifact_id, - ) - continue - - data = await app.STORAGE.retrieve_artifact(artifact) - - if not data: - LOG.error( - "No data found for artifact", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - block_label=script_block.script_block_label, - script_revision_id=script_block.script_revision_id, - file_id=script_file_id, - artifact_id=artifact_id, - ) - continue - - try: - decoded_data = data.decode("utf-8") - result[script_block.script_block_label] = decoded_data - except UnicodeDecodeError: - LOG.error( - "File content is not valid UTF-8 text", - workflow_permanent_id=workflow_permanent_id, - organization_id=current_org.organization_id, - block_label=script_block.script_block_label, - script_revision_id=script_block.script_revision_id, - file_id=script_file_id, - artifact_id=artifact_id, - ) - continue - - return ScriptBlocksResponse(blocks=result) - @base_router.get( "/scripts/{workflow_permanent_id}/{cache_key}/values", diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 7d52681a..9303e0d5 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -136,6 +136,7 @@ class WorkflowRun(BaseModel): workflow_title: str | None = None max_screenshot_scrolls: int | None = None browser_address: str | None = None + run_with: str | None = None script_run: ScriptRunResponse | None = None job_id: str | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index e3821431..8111f9f8 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -350,10 +350,10 @@ class WorkflowService: # Check if there's a related workflow script that should be used instead workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels) - is_script = workflow_script is not None - if workflow_script is not None: + is_script_run = self.should_run_script(workflow, workflow_run) + if workflow_script and is_script_run: LOG.info( - "Found related workflow script, running script instead of workflow", + "Running script for workflow run", workflow_run_id=workflow_run_id, workflow_id=workflow.workflow_id, organization_id=organization_id, @@ -387,7 +387,7 @@ class WorkflowService: WorkflowRunStatus.timed_out, ): workflow_run = await self.mark_workflow_run_as_completed( - workflow_run_id=workflow_run_id, is_script=is_script + workflow_run_id=workflow_run_id, is_script_run=is_script_run ) await self.generate_script_if_needed( workflow=workflow, @@ -955,12 +955,14 @@ class WorkflowService: workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None, - is_script: bool = False, + is_script_run: bool = False, ) -> WorkflowRun: + run_with = "code" if is_script_run else "agent" workflow_run = await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=status, failure_reason=failure_reason, + run_with=run_with, ) if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]: start_time = ( @@ -978,11 +980,11 @@ class WorkflowService: duration_seconds=duration_seconds, workflow_run_status=workflow_run.status, organization_id=workflow_run.organization_id, - is_script_run=is_script, + run_with=run_with, ) return workflow_run - async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, @@ -991,11 +993,11 @@ class WorkflowService: return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, - is_script=is_script, + is_script_run=is_script_run, ) async def mark_workflow_run_as_failed( - self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False + self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as failed", @@ -1007,10 +1009,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, failure_reason=failure_reason, - is_script=is_script, + is_script_run=is_script_run, ) - async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, @@ -1019,11 +1021,11 @@ class WorkflowService: return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.running, - is_script=is_script, + is_script_run=is_script_run, ) async def mark_workflow_run_as_terminated( - self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False + self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", @@ -1035,10 +1037,10 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, failure_reason=failure_reason, - is_script=is_script, + is_script_run=is_script_run, ) - async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: + async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as canceled", workflow_run_id=workflow_run_id, @@ -1047,11 +1049,11 @@ class WorkflowService: return await self._update_workflow_run_status( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.canceled, - is_script=is_script, + is_script_run=is_script_run, ) async def mark_workflow_run_as_timed_out( - self, workflow_run_id: str, failure_reason: str | None = None, is_script: bool = False + self, workflow_run_id: str, failure_reason: str | None = None, is_script_run: bool = False ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as timed out", @@ -1062,7 +1064,7 @@ class WorkflowService: workflow_run_id=workflow_run_id, status=WorkflowRunStatus.timed_out, failure_reason=failure_reason, - is_script=is_script, + is_script_run=is_script_run, ) async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun: @@ -2482,7 +2484,7 @@ class WorkflowService: # Mark workflow run as completed workflow_run = await self.mark_workflow_run_as_completed( - workflow_run_id=workflow_run.workflow_run_id, is_script=True + workflow_run_id=workflow_run.workflow_run_id, is_script_run=True ) LOG.info( @@ -2505,7 +2507,7 @@ class WorkflowService: # Mark workflow run as failed failure_reason = f"Failed to execute workflow script: {str(e)}" workflow_run = await self.mark_workflow_run_as_failed( - workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script=True + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script_run=True ) return workflow_run @@ -2516,8 +2518,6 @@ class WorkflowService: workflow_run: WorkflowRun, block_labels: list[str] | None = None, ) -> None: - if not workflow.generate_script: - return None if block_labels: # Do not generate script if block_labels is provided return None @@ -2549,3 +2549,14 @@ class WorkflowService: script=created_script, rendered_cache_key_value=rendered_cache_key_value, ) + + def should_run_script( + self, + workflow: Workflow, + workflow_run: WorkflowRun, + ) -> bool: + if workflow_run.run_with == "code": + return True + if workflow.generate_script: + return True + return False diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 6d23e689..fbfc9f32 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -250,13 +250,6 @@ async def execute_script( browser_session_id: str | None = None, background_tasks: BackgroundTasks | None = None, ) -> None: - # TODO: assume the script only has one ScriptFile called main.py - # step 1: get the script revision - # step 2: get the script files - # step 3: copy the script files to the local directory - # step 4: execute the script - # step 5: TODO: close all the browser instances - # step 1: get the script revision script = await app.DATABASE.get_script( script_id=script_id, @@ -282,7 +275,7 @@ async def execute_script( file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact) if not file_content: continue - file_path = os.path.join(script.script_id, file.file_path) + file_path = os.path.join(settings.TEMP_PATH, script.script_id, file.file_path) # create the directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) @@ -310,7 +303,7 @@ async def execute_script( parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) - script_path = os.path.join(script.script_id, "main.py") + script_path = os.path.join(settings.TEMP_PATH, script.script_id, "main.py") if background_tasks: # Execute asynchronously in background background_tasks.add_task( diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index e35111a1..91ffb7fe 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -61,8 +61,6 @@ async def get_workflow_script( cache_key = workflow.cache_key or "" rendered_cache_key_value = "" - if not workflow.generate_script: - return None, rendered_cache_key_value if block_labels: # Do not generate script or run script if block_labels is provided return None, rendered_cache_key_value