From 43be44cce5f53372db017473df483c140df65d9b Mon Sep 17 00:00:00 2001 From: Stanislav Novosad Date: Tue, 25 Nov 2025 12:15:06 -0700 Subject: [PATCH] Optimize get_workflow_scripts_by_cache_key_value SQL (#4095) --- skyvern/forge/sdk/db/client.py | 95 +++++---------------- skyvern/forge/sdk/routes/scripts.py | 7 +- skyvern/services/script_service.py | 6 +- skyvern/services/workflow_script_service.py | 7 +- 4 files changed, 30 insertions(+), 85 deletions(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 75751ece..91debebc 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -4893,43 +4893,6 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise - async def update_script( - self, - script_revision_id: str, - organization_id: str, - artifact_id: str | None = None, - run_id: str | None = None, - version: int | None = None, - ) -> Script: - try: - async with self.Session() as session: - get_script_query = ( - select(ScriptModel) - .filter_by(organization_id=organization_id) - .filter_by(script_revision_id=script_revision_id) - ) - if script := (await session.scalars(get_script_query)).first(): - if artifact_id: - script.artifact_id = artifact_id - if run_id: - script.run_id = run_id - if version: - script.version = version - await session.commit() - await session.refresh(script) - return convert_to_script(script) - else: - raise NotFoundError("Script not found") - except SQLAlchemyError: - LOG.error("SQLAlchemyError", exc_info=True) - raise - except NotFoundError: - LOG.error("No script found to update", script_revision_id=script_revision_id) - raise - except Exception: - LOG.error("UnexpectedError", exc_info=True) - raise - async def get_scripts( self, organization_id: str, @@ -5280,7 +5243,7 @@ class AgentDB: workflow_script_model = (await session.scalars(query)).first() return WorkflowScript.model_validate(workflow_script_model) if workflow_script_model else None - async def get_workflow_scripts_by_cache_key_value( + async def get_workflow_script_by_cache_key_value( self, *, organization_id: str, @@ -5289,51 +5252,35 @@ class AgentDB: workflow_run_id: str | None = None, cache_key: str | None = None, statuses: list[ScriptStatus] | None = None, - ) -> list[Script]: - """Get latest script versions linked to a workflow by a specific cache_key_value.""" + ) -> Script | None: + """Get latest script version linked to a workflow by a specific cache_key_value.""" try: async with self.Session() as session: - # Subquery: script_ids associated with this workflow + cache_key_value - ws_script_ids_subquery = ( - select(WorkflowScriptModel.script_id) - .where(WorkflowScriptModel.organization_id == organization_id) - .where(WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id) - .where(WorkflowScriptModel.cache_key_value == cache_key_value) - .where(WorkflowScriptModel.deleted_at.is_(None)) - ) - if workflow_run_id: - ws_script_ids_subquery = ws_script_ids_subquery.where( - WorkflowScriptModel.workflow_run_id == workflow_run_id + # Build the query: join workflow_scripts with scripts + query = ( + select(ScriptModel) + .join(WorkflowScriptModel, ScriptModel.script_id == WorkflowScriptModel.script_id) + .where( + WorkflowScriptModel.organization_id == organization_id, + WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id, + WorkflowScriptModel.cache_key_value == cache_key_value, + WorkflowScriptModel.deleted_at.is_(None), ) + ) + + if workflow_run_id: + query = query.where(WorkflowScriptModel.workflow_run_id == workflow_run_id) if cache_key is not None: - ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.cache_key == cache_key) + query = query.where(WorkflowScriptModel.cache_key == cache_key) if statuses is not None and len(statuses) > 0: - ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.status.in_(statuses)) + query = query.where(WorkflowScriptModel.status.in_(statuses)) - # Latest version per script_id within the org and not deleted - latest_versions_subquery = ( - select( - ScriptModel.script_id, - func.max(ScriptModel.version).label("latest_version"), - ) - .where(ScriptModel.organization_id == organization_id) - .where(ScriptModel.deleted_at.is_(None)) - .where(ScriptModel.script_id.in_(ws_script_ids_subquery)) - .group_by(ScriptModel.script_id) - .subquery() - ) + query = query.order_by(ScriptModel.created_at.desc(), ScriptModel.version.desc()).limit(1) - query = select(ScriptModel).join( - latest_versions_subquery, - (ScriptModel.script_id == latest_versions_subquery.c.script_id) - & (ScriptModel.version == latest_versions_subquery.c.latest_version), - ) - query = query.order_by(ScriptModel.created_at.desc()) - - scripts = (await session.scalars(query)).all() - return [convert_to_script(script) for script in scripts] + script = (await session.scalars(query)).first() + return convert_to_script(script) if script else None except SQLAlchemyError: LOG.error("SQLAlchemyError", exc_info=True) raise diff --git a/skyvern/forge/sdk/routes/scripts.py b/skyvern/forge/sdk/routes/scripts.py index 7eeb6cee..1ec9cba4 100644 --- a/skyvern/forge/sdk/routes/scripts.py +++ b/skyvern/forge/sdk/routes/scripts.py @@ -441,7 +441,7 @@ async def get_workflow_script_blocks( cache_key = block_script_request.cache_key or workflow.cache_key or "" status = block_script_request.status - scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value( + script = await app.DATABASE.get_workflow_script_by_cache_key_value( organization_id=current_org.organization_id, workflow_permanent_id=workflow_permanent_id, workflow_run_id=block_script_request.workflow_run_id, @@ -450,7 +450,7 @@ async def get_workflow_script_blocks( statuses=[status] if status else None, ) - if not scripts: + if not script: LOG.info( "No scripts found for workflow", workflow_permanent_id=workflow_permanent_id, @@ -460,9 +460,8 @@ async def get_workflow_script_blocks( ) return empty - first_script = scripts[0] return await get_script_blocks_response( - script_revision_id=first_script.script_revision_id, + script_revision_id=script.script_revision_id, organization_id=current_org.organization_id, workflow_permanent_id=workflow_permanent_id, ) diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index c3fe816a..4e40af65 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -945,7 +945,7 @@ async def _regenerate_script_block_after_ai_fallback( if not cache_key_value: cache_key_value = cache_key # Fallback - existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value( + existing_script = await app.DATABASE.get_workflow_script_by_cache_key_value( organization_id=organization_id, workflow_permanent_id=workflow.workflow_permanent_id, cache_key_value=cache_key_value, @@ -953,11 +953,11 @@ async def _regenerate_script_block_after_ai_fallback( statuses=[ScriptStatus.published], ) - if not existing_scripts: + if not existing_script: LOG.error("No existing script found to regenerate", cache_key=cache_key, cache_key_value=cache_key_value) return - current_script = existing_scripts[0] + current_script = existing_script LOG.info( "Regenerating script block after AI fallback", script_id=current_script.script_id, diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index 4c9f5b84..6dde3f11 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -76,22 +76,21 @@ async def get_workflow_script( return None, rendered_cache_key_value # Check if there are existing cached scripts for this workflow + cache_key_value - existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value( + existing_script = await app.DATABASE.get_workflow_script_by_cache_key_value( organization_id=workflow.organization_id, workflow_permanent_id=workflow.workflow_permanent_id, cache_key_value=rendered_cache_key_value, statuses=[status], ) - if existing_scripts: + if existing_script: LOG.info( "Found cached script for workflow", workflow_id=workflow.workflow_id, cache_key_value=rendered_cache_key_value, workflow_run_id=workflow_run.workflow_run_id, - script_count=len(existing_scripts), ) - return existing_scripts[0], rendered_cache_key_value + return existing_script, rendered_cache_key_value return None, rendered_cache_key_value