Optimize get_workflow_scripts_by_cache_key_value SQL (#4095)

This commit is contained in:
Stanislav Novosad
2025-11-25 12:15:06 -07:00
committed by GitHub
parent f00e82c1bb
commit 43be44cce5
4 changed files with 30 additions and 85 deletions

View File

@@ -4893,43 +4893,6 @@ class AgentDB:
LOG.error("SQLAlchemyError", exc_info=True)
raise
async def update_script(
self,
script_revision_id: str,
organization_id: str,
artifact_id: str | None = None,
run_id: str | None = None,
version: int | None = None,
) -> Script:
try:
async with self.Session() as session:
get_script_query = (
select(ScriptModel)
.filter_by(organization_id=organization_id)
.filter_by(script_revision_id=script_revision_id)
)
if script := (await session.scalars(get_script_query)).first():
if artifact_id:
script.artifact_id = artifact_id
if run_id:
script.run_id = run_id
if version:
script.version = version
await session.commit()
await session.refresh(script)
return convert_to_script(script)
else:
raise NotFoundError("Script not found")
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise
except NotFoundError:
LOG.error("No script found to update", script_revision_id=script_revision_id)
raise
except Exception:
LOG.error("UnexpectedError", exc_info=True)
raise
async def get_scripts(
self,
organization_id: str,
@@ -5280,7 +5243,7 @@ class AgentDB:
workflow_script_model = (await session.scalars(query)).first()
return WorkflowScript.model_validate(workflow_script_model) if workflow_script_model else None
async def get_workflow_scripts_by_cache_key_value(
async def get_workflow_script_by_cache_key_value(
self,
*,
organization_id: str,
@@ -5289,51 +5252,35 @@ class AgentDB:
workflow_run_id: str | None = None,
cache_key: str | None = None,
statuses: list[ScriptStatus] | None = None,
) -> list[Script]:
"""Get latest script versions linked to a workflow by a specific cache_key_value."""
) -> Script | None:
"""Get latest script version linked to a workflow by a specific cache_key_value."""
try:
async with self.Session() as session:
# Subquery: script_ids associated with this workflow + cache_key_value
ws_script_ids_subquery = (
select(WorkflowScriptModel.script_id)
.where(WorkflowScriptModel.organization_id == organization_id)
.where(WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id)
.where(WorkflowScriptModel.cache_key_value == cache_key_value)
.where(WorkflowScriptModel.deleted_at.is_(None))
)
if workflow_run_id:
ws_script_ids_subquery = ws_script_ids_subquery.where(
WorkflowScriptModel.workflow_run_id == workflow_run_id
# Build the query: join workflow_scripts with scripts
query = (
select(ScriptModel)
.join(WorkflowScriptModel, ScriptModel.script_id == WorkflowScriptModel.script_id)
.where(
WorkflowScriptModel.organization_id == organization_id,
WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id,
WorkflowScriptModel.cache_key_value == cache_key_value,
WorkflowScriptModel.deleted_at.is_(None),
)
)
if workflow_run_id:
query = query.where(WorkflowScriptModel.workflow_run_id == workflow_run_id)
if cache_key is not None:
ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.cache_key == cache_key)
query = query.where(WorkflowScriptModel.cache_key == cache_key)
if statuses is not None and len(statuses) > 0:
ws_script_ids_subquery = ws_script_ids_subquery.where(WorkflowScriptModel.status.in_(statuses))
query = query.where(WorkflowScriptModel.status.in_(statuses))
# Latest version per script_id within the org and not deleted
latest_versions_subquery = (
select(
ScriptModel.script_id,
func.max(ScriptModel.version).label("latest_version"),
)
.where(ScriptModel.organization_id == organization_id)
.where(ScriptModel.deleted_at.is_(None))
.where(ScriptModel.script_id.in_(ws_script_ids_subquery))
.group_by(ScriptModel.script_id)
.subquery()
)
query = query.order_by(ScriptModel.created_at.desc(), ScriptModel.version.desc()).limit(1)
query = select(ScriptModel).join(
latest_versions_subquery,
(ScriptModel.script_id == latest_versions_subquery.c.script_id)
& (ScriptModel.version == latest_versions_subquery.c.latest_version),
)
query = query.order_by(ScriptModel.created_at.desc())
scripts = (await session.scalars(query)).all()
return [convert_to_script(script) for script in scripts]
script = (await session.scalars(query)).first()
return convert_to_script(script) if script else None
except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True)
raise

View File

@@ -441,7 +441,7 @@ async def get_workflow_script_blocks(
cache_key = block_script_request.cache_key or workflow.cache_key or ""
status = block_script_request.status
scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value(
script = await app.DATABASE.get_workflow_script_by_cache_key_value(
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
workflow_run_id=block_script_request.workflow_run_id,
@@ -450,7 +450,7 @@ async def get_workflow_script_blocks(
statuses=[status] if status else None,
)
if not scripts:
if not script:
LOG.info(
"No scripts found for workflow",
workflow_permanent_id=workflow_permanent_id,
@@ -460,9 +460,8 @@ async def get_workflow_script_blocks(
)
return empty
first_script = scripts[0]
return await get_script_blocks_response(
script_revision_id=first_script.script_revision_id,
script_revision_id=script.script_revision_id,
organization_id=current_org.organization_id,
workflow_permanent_id=workflow_permanent_id,
)

View File

@@ -945,7 +945,7 @@ async def _regenerate_script_block_after_ai_fallback(
if not cache_key_value:
cache_key_value = cache_key # Fallback
existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value(
existing_script = await app.DATABASE.get_workflow_script_by_cache_key_value(
organization_id=organization_id,
workflow_permanent_id=workflow.workflow_permanent_id,
cache_key_value=cache_key_value,
@@ -953,11 +953,11 @@ async def _regenerate_script_block_after_ai_fallback(
statuses=[ScriptStatus.published],
)
if not existing_scripts:
if not existing_script:
LOG.error("No existing script found to regenerate", cache_key=cache_key, cache_key_value=cache_key_value)
return
current_script = existing_scripts[0]
current_script = existing_script
LOG.info(
"Regenerating script block after AI fallback",
script_id=current_script.script_id,

View File

@@ -76,22 +76,21 @@ async def get_workflow_script(
return None, rendered_cache_key_value
# Check if there are existing cached scripts for this workflow + cache_key_value
existing_scripts = await app.DATABASE.get_workflow_scripts_by_cache_key_value(
existing_script = await app.DATABASE.get_workflow_script_by_cache_key_value(
organization_id=workflow.organization_id,
workflow_permanent_id=workflow.workflow_permanent_id,
cache_key_value=rendered_cache_key_value,
statuses=[status],
)
if existing_scripts:
if existing_script:
LOG.info(
"Found cached script for workflow",
workflow_id=workflow.workflow_id,
cache_key_value=rendered_cache_key_value,
workflow_run_id=workflow_run.workflow_run_id,
script_count=len(existing_scripts),
)
return existing_scripts[0], rendered_cache_key_value
return existing_script, rendered_cache_key_value
return None, rendered_cache_key_value