only run script when generate_script is true (#3491)

This commit is contained in:
Shuchang Zheng
2025-09-21 02:45:23 -04:00
committed by GitHub
parent 770be508e3
commit 9a9ee01253
9 changed files with 186 additions and 142 deletions

View File

@@ -1813,9 +1813,6 @@ async def generate_workflow_script_python_code(
] ]
module = cst.Module(body=module_body) module = cst.Module(body=module_body)
with open(file_name, "w") as f:
f.write(module.code)
return module.code return module.code

View File

@@ -15,10 +15,18 @@ if __name__ == "__main__":
load_dotenv() load_dotenv()
reload = settings.ENV == "local" reload = settings.ENV == "local"
# Configure reload settings
uvicorn.run( uvicorn.run(
"skyvern.forge.api_app:app", "skyvern.forge.api_app:app",
host="0.0.0.0", host="0.0.0.0",
port=port, port=port,
log_level="info", log_level="info",
reload=reload, reload=reload,
reload_excludes=[
f"{settings.TEMP_PATH}/*.py",
f"{settings.TEMP_PATH}/**/*.py",
f"{settings.TEMP_PATH}/*",
f"{settings.TEMP_PATH}/**",
],
) )

View File

@@ -1676,6 +1676,7 @@ class AgentDB:
webhook_failure_reason: str | None = None, webhook_failure_reason: str | None = None,
ai_fallback_triggered: bool | None = None, ai_fallback_triggered: bool | None = None,
job_id: str | None = None, job_id: str | None = None,
run_with: str | None = None,
) -> WorkflowRun: ) -> WorkflowRun:
async with self.Session() as session: async with self.Session() as session:
workflow_run = ( workflow_run = (
@@ -1698,6 +1699,8 @@ class AgentDB:
workflow_run.script_run = {"ai_fallback_triggered": ai_fallback_triggered} workflow_run.script_run = {"ai_fallback_triggered": ai_fallback_triggered}
if job_id: if job_id:
workflow_run.job_id = job_id workflow_run.job_id = job_id
if run_with:
workflow_run.run_with = run_with
await session.commit() await session.commit()
await session.refresh(workflow_run) await session.refresh(workflow_run)
await save_workflow_run_logs(workflow_run_id) await save_workflow_run_logs(workflow_run_id)

View File

@@ -308,6 +308,7 @@ def convert_to_workflow_run(
script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run) script_run=ScriptRunResponse.model_validate(workflow_run_model.script_run)
if workflow_run_model.script_run if workflow_run_model.script_run
else None, else None,
run_with=workflow_run_model.run_with,
) )

View File

@@ -17,12 +17,126 @@ from skyvern.schemas.scripts import (
ScriptBlocksRequest, ScriptBlocksRequest,
ScriptBlocksResponse, ScriptBlocksResponse,
ScriptCacheKeyValuesResponse, ScriptCacheKeyValuesResponse,
ScriptStatus,
) )
from skyvern.services import script_service from skyvern.services import script_service, workflow_script_service
LOG = structlog.get_logger() LOG = structlog.get_logger()
async def get_script_blocks_response(
organization_id: str,
workflow_permanent_id: str,
script_revision_id: str,
) -> ScriptBlocksResponse:
script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id(
script_revision_id=script_revision_id,
organization_id=organization_id,
)
if not script_blocks:
LOG.info(
"No script block found for workflow",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
script_revision_id=script_revision_id,
)
return ScriptBlocksResponse(blocks={})
result: dict[str, str] = {}
# TODO(jdo): make concurrent to speed up
for script_block in script_blocks:
script_file_id = script_block.script_file_id
if not script_file_id:
LOG.info(
"No script file ID found for script block",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
script_revision_id=script_revision_id,
block_label=script_block.script_block_label,
)
continue
script_file = await app.DATABASE.get_script_file_by_id(
script_revision_id=script_revision_id,
file_id=script_file_id,
organization_id=organization_id,
)
if not script_file:
LOG.info(
"No script file found for script block",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
script_revision_id=script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
)
continue
artifact_id = script_file.artifact_id
if not artifact_id:
LOG.info(
"No artifact ID found for script file",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
script_revision_id=script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
)
continue
artifact = await app.DATABASE.get_artifact_by_id(
artifact_id,
organization_id,
)
if not artifact:
LOG.error(
"No artifact found for script file",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
script_revision_id=script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
artifact_id=artifact_id,
)
continue
data = await app.STORAGE.retrieve_artifact(artifact)
if not data:
LOG.error(
"No data found for artifact",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
block_label=script_block.script_block_label,
script_revision_id=script_block.script_revision_id,
file_id=script_file_id,
artifact_id=artifact_id,
)
continue
try:
decoded_data = data.decode("utf-8")
result[script_block.script_block_label] = decoded_data
except UnicodeDecodeError:
LOG.error(
"File content is not valid UTF-8 text",
workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id,
block_label=script_block.script_block_label,
script_revision_id=script_block.script_revision_id,
file_id=script_file_id,
artifact_id=artifact_id,
)
continue
return ScriptBlocksResponse(blocks=result)
@base_router.post( @base_router.post(
"/scripts", "/scripts",
response_model=CreateScriptResponse, response_model=CreateScriptResponse,
@@ -302,6 +416,27 @@ async def get_workflow_script_blocks(
if not workflow: if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found") raise HTTPException(status_code=404, detail="Workflow not found")
workflow_run_id = block_script_request.workflow_run_id
if workflow_run_id:
workflow_run = await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id,
)
if not workflow_run:
workflow_run_id = None
else:
# find the published script if any and return that
published_script, _ = await workflow_script_service.get_workflow_script(
workflow=workflow,
workflow_run=workflow_run,
status=ScriptStatus.published,
)
if published_script:
return await get_script_blocks_response(
script_revision_id=published_script.script_revision_id,
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
)
cache_key = block_script_request.cache_key or workflow.cache_key or "" cache_key = block_script_request.cache_key or workflow.cache_key or ""
status = block_script_request.status status = block_script_request.status
@@ -326,115 +461,12 @@ async def get_workflow_script_blocks(
return empty return empty
first_script = scripts[0] first_script = scripts[0]
return await get_script_blocks_response(
script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id(
script_revision_id=first_script.script_revision_id, script_revision_id=first_script.script_revision_id,
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
) )
if not script_blocks:
LOG.info(
"No script block found for workflow",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
script_revision_id=first_script.script_revision_id,
)
return empty
result: dict[str, str] = {}
# TODO(jdo): make concurrent to speed up
for script_block in script_blocks:
script_file_id = script_block.script_file_id
if not script_file_id:
LOG.info(
"No script file ID found for script block",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
script_revision_id=first_script.script_revision_id,
block_label=script_block.script_block_label,
)
continue
script_file = await app.DATABASE.get_script_file_by_id(
script_revision_id=first_script.script_revision_id,
file_id=script_file_id,
organization_id=current_org.organization_id,
)
if not script_file:
LOG.info(
"No script file found for script block",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
script_revision_id=first_script.script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
)
continue
artifact_id = script_file.artifact_id
if not artifact_id:
LOG.info(
"No artifact ID found for script file",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
script_revision_id=first_script.script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
)
continue
artifact = await app.DATABASE.get_artifact_by_id(
artifact_id,
current_org.organization_id,
)
if not artifact:
LOG.error(
"No artifact found for script file",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
script_revision_id=first_script.script_revision_id,
block_label=script_block.script_block_label,
script_file_id=script_file_id,
artifact_id=artifact_id,
)
continue
data = await app.STORAGE.retrieve_artifact(artifact)
if not data:
LOG.error(
"No data found for artifact",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
block_label=script_block.script_block_label,
script_revision_id=script_block.script_revision_id,
file_id=script_file_id,
artifact_id=artifact_id,
)
continue
try:
decoded_data = data.decode("utf-8")
result[script_block.script_block_label] = decoded_data
except UnicodeDecodeError:
LOG.error(
"File content is not valid UTF-8 text",
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
block_label=script_block.script_block_label,
script_revision_id=script_block.script_revision_id,
file_id=script_file_id,
artifact_id=artifact_id,
)
continue
return ScriptBlocksResponse(blocks=result)
@base_router.get( @base_router.get(
"/scripts/{workflow_permanent_id}/{cache_key}/values", "/scripts/{workflow_permanent_id}/{cache_key}/values",

View File

@@ -136,6 +136,7 @@ class WorkflowRun(BaseModel):
workflow_title: str | None = None workflow_title: str | None = None
max_screenshot_scrolls: int | None = None max_screenshot_scrolls: int | None = None
browser_address: str | None = None browser_address: str | None = None
run_with: str | None = None
script_run: ScriptRunResponse | None = None script_run: ScriptRunResponse | None = None
job_id: str | None = None job_id: str | None = None

View File

@@ -350,10 +350,10 @@ class WorkflowService:
# Check if there's a related workflow script that should be used instead # Check if there's a related workflow script that should be used instead
workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels) workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels)
is_script = workflow_script is not None is_script_run = self.should_run_script(workflow, workflow_run)
if workflow_script is not None: if workflow_script and is_script_run:
LOG.info( LOG.info(
"Found related workflow script, running script instead of workflow", "Running script for workflow run",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
workflow_id=workflow.workflow_id, workflow_id=workflow.workflow_id,
organization_id=organization_id, organization_id=organization_id,
@@ -387,7 +387,7 @@ class WorkflowService:
WorkflowRunStatus.timed_out, WorkflowRunStatus.timed_out,
): ):
workflow_run = await self.mark_workflow_run_as_completed( workflow_run = await self.mark_workflow_run_as_completed(
workflow_run_id=workflow_run_id, is_script=is_script workflow_run_id=workflow_run_id, is_script_run=is_script_run
) )
await self.generate_script_if_needed( await self.generate_script_if_needed(
workflow=workflow, workflow=workflow,
@@ -955,12 +955,14 @@ class WorkflowService:
workflow_run_id: str, workflow_run_id: str,
status: WorkflowRunStatus, status: WorkflowRunStatus,
failure_reason: str | None = None, failure_reason: str | None = None,
is_script: bool = False, is_script_run: bool = False,
) -> WorkflowRun: ) -> WorkflowRun:
run_with = "code" if is_script_run else "agent"
workflow_run = await app.DATABASE.update_workflow_run( workflow_run = await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=status, status=status,
failure_reason=failure_reason, failure_reason=failure_reason,
run_with=run_with,
) )
if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]: if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]:
start_time = ( start_time = (
@@ -978,11 +980,11 @@ class WorkflowService:
duration_seconds=duration_seconds, duration_seconds=duration_seconds,
workflow_run_status=workflow_run.status, workflow_run_status=workflow_run.status,
organization_id=workflow_run.organization_id, organization_id=workflow_run.organization_id,
is_script_run=is_script, run_with=run_with,
) )
return workflow_run return workflow_run
async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: async def mark_workflow_run_as_completed(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as completed", f"Marking workflow run {workflow_run_id} as completed",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -991,11 +993,11 @@ class WorkflowService:
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.completed, status=WorkflowRunStatus.completed,
is_script=is_script, is_script_run=is_script_run,
) )
async def mark_workflow_run_as_failed( async def mark_workflow_run_as_failed(
self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False
) -> WorkflowRun: ) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as failed", f"Marking workflow run {workflow_run_id} as failed",
@@ -1007,10 +1009,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed, status=WorkflowRunStatus.failed,
failure_reason=failure_reason, failure_reason=failure_reason,
is_script=is_script, is_script_run=is_script_run,
) )
async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: async def mark_workflow_run_as_running(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as running", f"Marking workflow run {workflow_run_id} as running",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -1019,11 +1021,11 @@ class WorkflowService:
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.running, status=WorkflowRunStatus.running,
is_script=is_script, is_script_run=is_script_run,
) )
async def mark_workflow_run_as_terminated( async def mark_workflow_run_as_terminated(
self, workflow_run_id: str, failure_reason: str | None, is_script: bool = False self, workflow_run_id: str, failure_reason: str | None, is_script_run: bool = False
) -> WorkflowRun: ) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as terminated", f"Marking workflow run {workflow_run_id} as terminated",
@@ -1035,10 +1037,10 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.terminated, status=WorkflowRunStatus.terminated,
failure_reason=failure_reason, failure_reason=failure_reason,
is_script=is_script, is_script_run=is_script_run,
) )
async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script: bool = False) -> WorkflowRun: async def mark_workflow_run_as_canceled(self, workflow_run_id: str, is_script_run: bool = False) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as canceled", f"Marking workflow run {workflow_run_id} as canceled",
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -1047,11 +1049,11 @@ class WorkflowService:
return await self._update_workflow_run_status( return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.canceled, status=WorkflowRunStatus.canceled,
is_script=is_script, is_script_run=is_script_run,
) )
async def mark_workflow_run_as_timed_out( async def mark_workflow_run_as_timed_out(
self, workflow_run_id: str, failure_reason: str | None = None, is_script: bool = False self, workflow_run_id: str, failure_reason: str | None = None, is_script_run: bool = False
) -> WorkflowRun: ) -> WorkflowRun:
LOG.info( LOG.info(
f"Marking workflow run {workflow_run_id} as timed out", f"Marking workflow run {workflow_run_id} as timed out",
@@ -1062,7 +1064,7 @@ class WorkflowService:
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.timed_out, status=WorkflowRunStatus.timed_out,
failure_reason=failure_reason, failure_reason=failure_reason,
is_script=is_script, is_script_run=is_script_run,
) )
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun: async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun:
@@ -2482,7 +2484,7 @@ class WorkflowService:
# Mark workflow run as completed # Mark workflow run as completed
workflow_run = await self.mark_workflow_run_as_completed( workflow_run = await self.mark_workflow_run_as_completed(
workflow_run_id=workflow_run.workflow_run_id, is_script=True workflow_run_id=workflow_run.workflow_run_id, is_script_run=True
) )
LOG.info( LOG.info(
@@ -2505,7 +2507,7 @@ class WorkflowService:
# Mark workflow run as failed # Mark workflow run as failed
failure_reason = f"Failed to execute workflow script: {str(e)}" failure_reason = f"Failed to execute workflow script: {str(e)}"
workflow_run = await self.mark_workflow_run_as_failed( workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script=True workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason, is_script_run=True
) )
return workflow_run return workflow_run
@@ -2516,8 +2518,6 @@ class WorkflowService:
workflow_run: WorkflowRun, workflow_run: WorkflowRun,
block_labels: list[str] | None = None, block_labels: list[str] | None = None,
) -> None: ) -> None:
if not workflow.generate_script:
return None
if block_labels: if block_labels:
# Do not generate script if block_labels is provided # Do not generate script if block_labels is provided
return None return None
@@ -2549,3 +2549,14 @@ class WorkflowService:
script=created_script, script=created_script,
rendered_cache_key_value=rendered_cache_key_value, rendered_cache_key_value=rendered_cache_key_value,
) )
def should_run_script(
self,
workflow: Workflow,
workflow_run: WorkflowRun,
) -> bool:
if workflow_run.run_with == "code":
return True
if workflow.generate_script:
return True
return False

View File

@@ -250,13 +250,6 @@ async def execute_script(
browser_session_id: str | None = None, browser_session_id: str | None = None,
background_tasks: BackgroundTasks | None = None, background_tasks: BackgroundTasks | None = None,
) -> None: ) -> None:
# TODO: assume the script only has one ScriptFile called main.py
# step 1: get the script revision
# step 2: get the script files
# step 3: copy the script files to the local directory
# step 4: execute the script
# step 5: TODO: close all the browser instances
# step 1: get the script revision # step 1: get the script revision
script = await app.DATABASE.get_script( script = await app.DATABASE.get_script(
script_id=script_id, script_id=script_id,
@@ -282,7 +275,7 @@ async def execute_script(
file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact) file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact)
if not file_content: if not file_content:
continue continue
file_path = os.path.join(script.script_id, file.file_path) file_path = os.path.join(settings.TEMP_PATH, script.script_id, file.file_path)
# create the directory if it doesn't exist # create the directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True) os.makedirs(os.path.dirname(file_path), exist_ok=True)
@@ -310,7 +303,7 @@ async def execute_script(
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples} parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters) LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters)
script_path = os.path.join(script.script_id, "main.py") script_path = os.path.join(settings.TEMP_PATH, script.script_id, "main.py")
if background_tasks: if background_tasks:
# Execute asynchronously in background # Execute asynchronously in background
background_tasks.add_task( background_tasks.add_task(

View File

@@ -61,8 +61,6 @@ async def get_workflow_script(
cache_key = workflow.cache_key or "" cache_key = workflow.cache_key or ""
rendered_cache_key_value = "" rendered_cache_key_value = ""
if not workflow.generate_script:
return None, rendered_cache_key_value
if block_labels: if block_labels:
# Do not generate script or run script if block_labels is provided # Do not generate script or run script if block_labels is provided
return None, rendered_cache_key_value return None, rendered_cache_key_value