From 00db70cdc8a9a558005087d85ad8df0509091c6e Mon Sep 17 00:00:00 2001 From: Jonathan Dobson Date: Thu, 21 Aug 2025 19:01:59 -0400 Subject: [PATCH] Add Cache Key Value API Endpoints and Db Methods (#3265) --- skyvern/forge/sdk/db/client.py | 138 ++++++++++++++++++++++++++++ skyvern/forge/sdk/routes/scripts.py | 100 ++++++++++++++++++++ skyvern/schemas/scripts.py | 8 ++ 3 files changed, 246 insertions(+) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 9fc5e8e9..ec1fe4e2 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -3961,3 +3961,141 @@ class AgentDB: except Exception: LOG.error("UnexpectedError", exc_info=True) raise + + async def get_workflow_cache_key_count( + self, + organization_id: str, + workflow_permanent_id: str, + cache_key: str, + filter: str | None = None, + ) -> int: + try: + async with self.Session() as session: + query = ( + select(func.count()) + .select_from(WorkflowScriptModel) + .filter_by(organization_id=organization_id) + .filter_by(workflow_permanent_id=workflow_permanent_id) + .filter_by(cache_key=cache_key) + .filter_by(deleted_at=None) + ) + + if filter: + query = query.filter(WorkflowScriptModel.cache_key_value.contains(filter)) + + return (await session.execute(query)).scalar_one() + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def get_workflow_cache_key_values( + self, + organization_id: str, + workflow_permanent_id: str, + cache_key: str, + page: int = 1, + page_size: int = 100, + filter: str | None = None, + ) -> list[str]: + try: + async with self.Session() as session: + query = ( + select(WorkflowScriptModel.cache_key_value) + .order_by(WorkflowScriptModel.cache_key_value.asc()) + .filter_by(organization_id=organization_id) + .filter_by(workflow_permanent_id=workflow_permanent_id) + .filter_by(cache_key=cache_key) + .filter_by(deleted_at=None) + .offset((page - 1) * page_size) + .limit(page_size) + ) + + if filter: + query = query.filter(WorkflowScriptModel.cache_key_value.contains(filter)) + + return (await session.scalars(query)).all() + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def create_workflow_cache_key_value( + self, + organization_id: str, + workflow_permanent_id: str, + cache_key: str, + cache_key_value: str, + script_id: str, + workflow_id: str | None = None, + workflow_run_id: str | None = None, + ) -> str: + """ + Insert a new cache key value for a workflow. + + Returns the workflow_script_id of the created record. + """ + try: + async with self.Session() as session: + workflow_script = WorkflowScriptModel( + script_id=script_id, + organization_id=organization_id, + workflow_permanent_id=workflow_permanent_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + cache_key=cache_key, + cache_key_value=cache_key_value, + ) + + session.add(workflow_script) + await session.commit() + await session.refresh(workflow_script) + + return workflow_script.workflow_script_id + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def delete_workflow_cache_key_value( + self, + organization_id: str, + workflow_permanent_id: str, + cache_key_value: str, + ) -> bool: + """ + Soft delete workflow cache key values by setting deleted_at timestamp. + + Returns True if any records were deleted, False otherwise. + """ + try: + async with self.Session() as session: + stmt = ( + update(WorkflowScriptModel) + .where( + and_( + WorkflowScriptModel.organization_id == organization_id, + WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id, + WorkflowScriptModel.cache_key_value == cache_key_value, + WorkflowScriptModel.deleted_at.is_(None), + ) + ) + .values(deleted_at=datetime.now(timezone.utc)) + ) + + result = await session.execute(stmt) + await session.commit() + + return result.rowcount > 0 + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise diff --git a/skyvern/forge/sdk/routes/scripts.py b/skyvern/forge/sdk/routes/scripts.py index 6c4ba018..c782a3e0 100644 --- a/skyvern/forge/sdk/routes/scripts.py +++ b/skyvern/forge/sdk/routes/scripts.py @@ -16,6 +16,7 @@ from skyvern.schemas.scripts import ( Script, ScriptBlocksRequest, ScriptBlocksResponse, + ScriptCacheKeyValuesResponse, ) from skyvern.services import script_service @@ -419,3 +420,102 @@ async def get_workflow_script_blocks( continue return ScriptBlocksResponse(blocks=result) + + +@base_router.get( + "/scripts/{workflow_permanent_id}/{cache_key}/values", + include_in_schema=False, + response_model=ScriptCacheKeyValuesResponse, +) +async def get_workflow_cache_key_values( + workflow_permanent_id: str, + cache_key: str, + current_org: Organization = Depends(org_auth_service.get_current_org), + page: int = Query( + 1, + ge=1, + description="Page number for pagination", + examples=[1], + ), + page_size: int = Query( + 100, + ge=1, + description="Number of items per page", + examples=[100], + ), + filter: str | None = Query( + None, + description="Filter values by a substring", + examples=["value1", "value2"], + ), +) -> ScriptCacheKeyValuesResponse: + # TODO(jdo): concurrent-ize + + values = await app.DATABASE.get_workflow_cache_key_values( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key=cache_key, + page=page, + page_size=page_size, + filter=filter, + ) + + total_count = await app.DATABASE.get_workflow_cache_key_count( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key=cache_key, + ) + + filtered_count = await app.DATABASE.get_workflow_cache_key_count( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key=cache_key, + filter=filter, + ) + + return ScriptCacheKeyValuesResponse( + filtered_count=filtered_count, + page=page, + page_size=page_size, + total_count=total_count, + values=values, + ) + + +@base_router.delete( + "/scripts/{workflow_permanent_id}/value/{cache_key_value}", + include_in_schema=False, +) +async def delete_workflow_cache_key_value( + workflow_permanent_id: str, + cache_key_value: str, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> dict[str, str]: + """Delete a specific cache key value for a workflow.""" + LOG.info( + "Deleting workflow cache key value", + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key_value=cache_key_value, + ) + + # Verify workflow exists + workflow = await app.DATABASE.get_workflow_by_permanent_id( + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + ) + + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + + # Delete the cache key value + deleted = await app.DATABASE.delete_workflow_cache_key_value( + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + cache_key_value=cache_key_value, + ) + + if not deleted: + raise HTTPException(status_code=404, detail="Cache key value not found") + + return {"message": "Cache key value deleted successfully"} diff --git a/skyvern/schemas/scripts.py b/skyvern/schemas/scripts.py index afa75535..e7147c50 100644 --- a/skyvern/schemas/scripts.py +++ b/skyvern/schemas/scripts.py @@ -139,6 +139,14 @@ class ScriptBlock(BaseModel): deleted_at: datetime | None = None +class ScriptCacheKeyValuesResponse(BaseModel): + filtered_count: int + page: int + page_size: int + total_count: int + values: list[str] + + class ScriptBlocksResponse(BaseModel): blocks: dict[str, str]