Add Cache Key Value API Endpoints and Db Methods (#3265)

This commit is contained in:
Jonathan Dobson
2025-08-21 19:01:59 -04:00
committed by GitHub
parent 2a62dc08aa
commit 00db70cdc8
3 changed files with 246 additions and 0 deletions

View File

@@ -3961,3 +3961,141 @@ class AgentDB:
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def get_workflow_cache_key_count(
self,
organization_id: str,
workflow_permanent_id: str,
cache_key: str,
filter: str | None = None,
) -> int:
try:
async with self.Session() as session:
query = (
select(func.count())
.select_from(WorkflowScriptModel)
.filter_by(organization_id=organization_id)
.filter_by(workflow_permanent_id=workflow_permanent_id)
.filter_by(cache_key=cache_key)
.filter_by(deleted_at=None)
)
if filter:
query = query.filter(WorkflowScriptModel.cache_key_value.contains(filter))
return (await session.execute(query)).scalar_one()
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def get_workflow_cache_key_values(
self,
organization_id: str,
workflow_permanent_id: str,
cache_key: str,
page: int = 1,
page_size: int = 100,
filter: str | None = None,
) -> list[str]:
try:
async with self.Session() as session:
query = (
select(WorkflowScriptModel.cache_key_value)
.order_by(WorkflowScriptModel.cache_key_value.asc())
.filter_by(organization_id=organization_id)
.filter_by(workflow_permanent_id=workflow_permanent_id)
.filter_by(cache_key=cache_key)
.filter_by(deleted_at=None)
.offset((page - 1) * page_size)
.limit(page_size)
)
if filter:
query = query.filter(WorkflowScriptModel.cache_key_value.contains(filter))
return (await session.scalars(query)).all()
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def create_workflow_cache_key_value(
self,
organization_id: str,
workflow_permanent_id: str,
cache_key: str,
cache_key_value: str,
script_id: str,
workflow_id: str | None = None,
workflow_run_id: str | None = None,
) -> str:
"""
Insert a new cache key value for a workflow.
Returns the workflow_script_id of the created record.
"""
try:
async with self.Session() as session:
workflow_script = WorkflowScriptModel(
script_id=script_id,
organization_id=organization_id,
workflow_permanent_id=workflow_permanent_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
cache_key=cache_key,
cache_key_value=cache_key_value,
)
session.add(workflow_script)
await session.commit()
await session.refresh(workflow_script)
return workflow_script.workflow_script_id
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def delete_workflow_cache_key_value(
self,
organization_id: str,
workflow_permanent_id: str,
cache_key_value: str,
) -> bool:
"""
Soft delete workflow cache key values by setting deleted_at timestamp.
Returns True if any records were deleted, False otherwise.
"""
try:
async with self.Session() as session:
stmt = (
update(WorkflowScriptModel)
.where(
and_(
WorkflowScriptModel.organization_id == organization_id,
WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id,
WorkflowScriptModel.cache_key_value == cache_key_value,
WorkflowScriptModel.deleted_at.is_(None),
)
)
.values(deleted_at=datetime.now(timezone.utc))
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount > 0
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise

View File

@@ -16,6 +16,7 @@ from skyvern.schemas.scripts import (
Script,
ScriptBlocksRequest,
ScriptBlocksResponse,
ScriptCacheKeyValuesResponse,
)
from skyvern.services import script_service
@@ -419,3 +420,102 @@ async def get_workflow_script_blocks(
continue
return ScriptBlocksResponse(blocks=result)
@base_router.get(
"/scripts/{workflow_permanent_id}/{cache_key}/values",
include_in_schema=False,
response_model=ScriptCacheKeyValuesResponse,
)
async def get_workflow_cache_key_values(
workflow_permanent_id: str,
cache_key: str,
current_org: Organization = Depends(org_auth_service.get_current_org),
page: int = Query(
1,
ge=1,
description="Page number for pagination",
examples=[1],
),
page_size: int = Query(
100,
ge=1,
description="Number of items per page",
examples=[100],
),
filter: str | None = Query(
None,
description="Filter values by a substring",
examples=["value1", "value2"],
),
) -> ScriptCacheKeyValuesResponse:
# TODO(jdo): concurrent-ize
values = await app.DATABASE.get_workflow_cache_key_values(
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
cache_key=cache_key,
page=page,
page_size=page_size,
filter=filter,
)
total_count = await app.DATABASE.get_workflow_cache_key_count(
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
cache_key=cache_key,
)
filtered_count = await app.DATABASE.get_workflow_cache_key_count(
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
cache_key=cache_key,
filter=filter,
)
return ScriptCacheKeyValuesResponse(
filtered_count=filtered_count,
page=page,
page_size=page_size,
total_count=total_count,
values=values,
)
@base_router.delete(
"/scripts/{workflow_permanent_id}/value/{cache_key_value}",
include_in_schema=False,
)
async def delete_workflow_cache_key_value(
workflow_permanent_id: str,
cache_key_value: str,
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> dict[str, str]:
"""Delete a specific cache key value for a workflow."""
LOG.info(
"Deleting workflow cache key value",
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
cache_key_value=cache_key_value,
)
# Verify workflow exists
workflow = await app.DATABASE.get_workflow_by_permanent_id(
workflow_permanent_id=workflow_permanent_id,
organization_id=current_org.organization_id,
)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
# Delete the cache key value
deleted = await app.DATABASE.delete_workflow_cache_key_value(
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
cache_key_value=cache_key_value,
)
if not deleted:
raise HTTPException(status_code=404, detail="Cache key value not found")
return {"message": "Cache key value deleted successfully"}

View File

@@ -139,6 +139,14 @@ class ScriptBlock(BaseModel):
deleted_at: datetime | None = None
class ScriptCacheKeyValuesResponse(BaseModel):
filtered_count: int
page: int
page_size: int
total_count: int
values: list[str]
class ScriptBlocksResponse(BaseModel):
blocks: dict[str, str]