From e5106124e3835311f9cc8bc0d5d2f0b4a8ca8ba8 Mon Sep 17 00:00:00 2001 From: Jonathan Dobson Date: Mon, 11 Aug 2025 19:21:44 -0400 Subject: [PATCH] add endpoint and query for fetching script blocks for a wpid (#3164) --- skyvern/forge/sdk/db/client.py | 22 ++++ skyvern/forge/sdk/routes/scripts.py | 160 +++++++++++++++++++++++++++- skyvern/schemas/scripts.py | 9 ++ 3 files changed, 190 insertions(+), 1 deletion(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index d90acff4..a82f241d 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -3819,6 +3819,24 @@ class AgentDB: ).all() return [convert_to_script_file(script_file) for script_file in script_files] + async def get_script_file_by_id( + self, + script_revision_id: str, + file_id: str, + organization_id: str, + ) -> ScriptFile | None: + async with self.Session() as session: + script_file = ( + await session.scalars( + select(ScriptFileModel) + .filter_by(script_revision_id=script_revision_id) + .filter_by(file_id=file_id) + .filter_by(organization_id=organization_id) + ) + ).first() + + return convert_to_script_file(script_file) if script_file else None + async def get_script_block( self, script_block_id: str, @@ -3887,6 +3905,7 @@ class AgentDB: organization_id: str, workflow_permanent_id: str, cache_key_value: str, + cache_key: str | None = None, ) -> list[Script]: """Get latest script versions linked to a workflow by a specific cache_key_value.""" try: @@ -3900,6 +3919,9 @@ class AgentDB: .where(WorkflowScriptModel.deleted_at.is_(None)) ) + if cache_key: + ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.cache_key == cache_key) + # Latest version per script_id within the org and not deleted latest_versions_subquery = ( select( diff --git a/skyvern/forge/sdk/routes/scripts.py b/skyvern/forge/sdk/routes/scripts.py index 99a25258..6c4ba018 100644 --- a/skyvern/forge/sdk/routes/scripts.py +++ b/skyvern/forge/sdk/routes/scripts.py @@ -9,7 +9,14 @@ from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.routes.routers import base_router from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.services import org_auth_service -from skyvern.schemas.scripts import CreateScriptRequest, CreateScriptResponse, DeployScriptRequest, Script +from skyvern.schemas.scripts import ( + CreateScriptRequest, + CreateScriptResponse, + DeployScriptRequest, + Script, + ScriptBlocksRequest, + ScriptBlocksResponse, +) from skyvern.services import script_service LOG = structlog.get_logger() @@ -261,3 +268,154 @@ async def run_script( organization_id=current_org.organization_id, background_tasks=background_tasks, ) + + +@base_router.post( + "/scripts/{workflow_permanent_id}/blocks", + include_in_schema=False, + response_model=ScriptBlocksResponse, +) +async def get_workflow_script_blocks( + workflow_permanent_id: str, + block_script_request: ScriptBlocksRequest, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> ScriptBlocksResponse: + empty = ScriptBlocksResponse(blocks={}) + cache_key_value = block_script_request.cache_key_value + + workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + ) + + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + + cache_key = block_script_request.cache_key or workflow.cache_key or "" + + scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key_value=cache_key_value, + cache_key=cache_key, + ) + + if not scripts: + LOG.info( + "No scripts found for workflow", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + cache_key_value=cache_key_value, + cache_key=cache_key, + ) + return empty + + first_script = scripts[0] + + script_blocks = await app.DATABASE.get_script_blocks_by_script_revision_id( + script_revision_id=first_script.script_revision_id, + organization_id=current_org.organization_id, + ) + + if not script_blocks: + LOG.info( + "No script block found for workflow", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + script_revision_id=first_script.script_revision_id, + ) + return empty + + result: dict[str, str] = {} + + # TODO(jdo): make concurrent to speed up + for script_block in script_blocks: + script_file_id = script_block.script_file_id + + if not script_file_id: + LOG.info( + "No script file ID found for script block", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + script_revision_id=first_script.script_revision_id, + block_label=script_block.script_block_label, + ) + continue + + script_file = await app.DATABASE.get_script_file_by_id( + script_revision_id=first_script.script_revision_id, + file_id=script_file_id, + organization_id=current_org.organization_id, + ) + + if not script_file: + LOG.info( + "No script file found for script block", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + script_revision_id=first_script.script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + ) + continue + + artifact_id = script_file.artifact_id + + if not artifact_id: + LOG.info( + "No artifact ID found for script file", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + script_revision_id=first_script.script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + ) + continue + + artifact = await app.DATABASE.get_artifact_by_id( + artifact_id, + current_org.organization_id, + ) + + if not artifact: + LOG.error( + "No artifact found for script file", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + script_revision_id=first_script.script_revision_id, + block_label=script_block.script_block_label, + script_file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + + data = await app.STORAGE.retrieve_artifact(artifact) + + if not data: + LOG.error( + "No data found for artifact", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + block_label=script_block.script_block_label, + script_revision_id=script_block.script_revision_id, + file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + + try: + decoded_data = data.decode("utf-8") + result[script_block.script_block_label] = decoded_data + except UnicodeDecodeError: + LOG.error( + "File content is not valid UTF-8 text", + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + block_label=script_block.script_block_label, + script_revision_id=script_block.script_revision_id, + file_id=script_file_id, + artifact_id=artifact_id, + ) + continue + + return ScriptBlocksResponse(blocks=result) diff --git a/skyvern/schemas/scripts.py b/skyvern/schemas/scripts.py index bebfab2d..afa75535 100644 --- a/skyvern/schemas/scripts.py +++ b/skyvern/schemas/scripts.py @@ -137,3 +137,12 @@ class ScriptBlock(BaseModel): created_at: datetime modified_at: datetime deleted_at: datetime | None = None + + +class ScriptBlocksResponse(BaseModel): + blocks: dict[str, str] + + +class ScriptBlocksRequest(BaseModel): + cache_key_value: str + cache_key: str | None = None