feat(workflows, runs, api): parameter metadata search/filter/display across workflows and runs (#3718)

Co-authored-by: Jonathan Dobson <jon.m.dobson@gmail.com>
This commit is contained in:
Celal Zamanoglu
2025-10-16 16:04:53 +03:00
committed by GitHub
parent 427e674299
commit 5531367566
9 changed files with 700 additions and 18 deletions

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timedelta
from typing import Any, List, Literal, Sequence, overload
import structlog
from sqlalchemy import and_, asc, case, delete, distinct, func, or_, pool, select, tuple_, update
from sqlalchemy import and_, asc, case, delete, distinct, exists, func, or_, pool, select, tuple_, update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
@@ -1574,11 +1574,17 @@ class AgentDB:
page_size: int = 10,
only_saved_tasks: bool = False,
only_workflows: bool = False,
title: str = "",
search_key: str | None = None,
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]:
"""
Get all workflows with the latest version for the organization.
Search semantics:
- If `search_key` is provided, its value is used as a unified search term for both
`workflows.title` and workflow parameter metadata (key, description, and default_value).
- If `search_key` is not provided, no search filtering is applied.
- Parameter metadata search excludes soft-deleted parameter rows across parameter tables.
"""
if page < 1:
raise ValueError(f"Page must be greater than 0, got {page}")
@@ -1609,10 +1615,133 @@ class AgentDB:
main_query = main_query.where(WorkflowModel.is_saved_task.is_(True))
elif only_workflows:
main_query = main_query.where(WorkflowModel.is_saved_task.is_(False))
if title:
main_query = main_query.where(WorkflowModel.title.ilike(f"%{title}%"))
if statuses:
main_query = main_query.where(WorkflowModel.status.in_(statuses))
if search_key:
search_like = f"%{search_key}%"
title_like = WorkflowModel.title.ilike(search_like)
parameter_filters = [
# WorkflowParameterModel
exists(
select(1)
.select_from(WorkflowParameterModel)
.where(WorkflowParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(WorkflowParameterModel.deleted_at.is_(None))
.where(
or_(
WorkflowParameterModel.key.ilike(search_like),
WorkflowParameterModel.description.ilike(search_like),
WorkflowParameterModel.default_value.ilike(search_like),
)
)
),
# OutputParameterModel
exists(
select(1)
.select_from(OutputParameterModel)
.where(OutputParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(OutputParameterModel.deleted_at.is_(None))
.where(
or_(
OutputParameterModel.key.ilike(search_like),
OutputParameterModel.description.ilike(search_like),
)
)
),
# AWSSecretParameterModel
exists(
select(1)
.select_from(AWSSecretParameterModel)
.where(AWSSecretParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(AWSSecretParameterModel.deleted_at.is_(None))
.where(
or_(
AWSSecretParameterModel.key.ilike(search_like),
AWSSecretParameterModel.description.ilike(search_like),
)
)
),
# BitwardenLoginCredentialParameterModel
exists(
select(1)
.select_from(BitwardenLoginCredentialParameterModel)
.where(BitwardenLoginCredentialParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(BitwardenLoginCredentialParameterModel.deleted_at.is_(None))
.where(
or_(
BitwardenLoginCredentialParameterModel.key.ilike(search_like),
BitwardenLoginCredentialParameterModel.description.ilike(search_like),
)
)
),
# BitwardenSensitiveInformationParameterModel
exists(
select(1)
.select_from(BitwardenSensitiveInformationParameterModel)
.where(BitwardenSensitiveInformationParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(BitwardenSensitiveInformationParameterModel.deleted_at.is_(None))
.where(
or_(
BitwardenSensitiveInformationParameterModel.key.ilike(search_like),
BitwardenSensitiveInformationParameterModel.description.ilike(search_like),
)
)
),
# BitwardenCreditCardDataParameterModel
exists(
select(1)
.select_from(BitwardenCreditCardDataParameterModel)
.where(BitwardenCreditCardDataParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(BitwardenCreditCardDataParameterModel.deleted_at.is_(None))
.where(
or_(
BitwardenCreditCardDataParameterModel.key.ilike(search_like),
BitwardenCreditCardDataParameterModel.description.ilike(search_like),
)
)
),
# OnePasswordCredentialParameterModel
exists(
select(1)
.select_from(OnePasswordCredentialParameterModel)
.where(OnePasswordCredentialParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(OnePasswordCredentialParameterModel.deleted_at.is_(None))
.where(
or_(
OnePasswordCredentialParameterModel.key.ilike(search_like),
OnePasswordCredentialParameterModel.description.ilike(search_like),
)
)
),
# AzureVaultCredentialParameterModel
exists(
select(1)
.select_from(AzureVaultCredentialParameterModel)
.where(AzureVaultCredentialParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(AzureVaultCredentialParameterModel.deleted_at.is_(None))
.where(
or_(
AzureVaultCredentialParameterModel.key.ilike(search_like),
AzureVaultCredentialParameterModel.description.ilike(search_like),
)
)
),
# CredentialParameterModel
exists(
select(1)
.select_from(CredentialParameterModel)
.where(CredentialParameterModel.workflow_id == WorkflowModel.workflow_id)
.where(CredentialParameterModel.deleted_at.is_(None))
.where(
or_(
CredentialParameterModel.key.ilike(search_like),
CredentialParameterModel.description.ilike(search_like),
)
)
),
]
main_query = main_query.where(or_(title_like, or_(*parameter_filters)))
main_query = (
main_query.order_by(WorkflowModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
)
@@ -1987,7 +2116,11 @@ class AgentDB:
page: int = 1,
page_size: int = 10,
status: list[WorkflowRunStatus] | None = None,
search_key: str | None = None,
) -> list[WorkflowRun]:
"""
Get runs for a workflow, with optional `search_key` on parameter key/description/value.
"""
try:
async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based
@@ -1997,6 +2130,29 @@ class AgentDB:
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
.filter(WorkflowRunModel.organization_id == organization_id)
)
if search_key:
key_like = f"%{search_key}%"
# Filter runs where any run parameter matches by key/description/value
# Use EXISTS to avoid duplicate rows and to keep pagination correct
param_exists = exists(
select(1)
.select_from(WorkflowRunParameterModel)
.join(
WorkflowParameterModel,
WorkflowParameterModel.workflow_parameter_id
== WorkflowRunParameterModel.workflow_parameter_id,
)
.where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
.where(WorkflowParameterModel.deleted_at.is_(None))
.where(
or_(
WorkflowParameterModel.key.ilike(key_like),
WorkflowParameterModel.description.ilike(key_like),
WorkflowRunParameterModel.value.ilike(key_like),
)
)
)
query = query.where(param_exists)
if status:
query = query.filter(WorkflowRunModel.status.in_(status))
query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)

View File

@@ -1679,8 +1679,15 @@ async def get_workflow_runs_by_id(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1),
status: Annotated[list[WorkflowRunStatus] | None, Query()] = None,
search_key: str | None = Query(
None,
description="Search runs by parameter key, parameter description, or run parameter value.",
),
current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRun]:
"""
Get workflow runs for a specific workflow permanent id.
"""
analytics.capture("skyvern-oss-agent-workflow-runs-get")
return await app.WORKFLOW_SERVICE.get_workflow_runs_for_workflow_permanent_id(
workflow_permanent_id=workflow_id,
@@ -1688,6 +1695,7 @@ async def get_workflow_runs_by_id(
page=page,
page_size=page_size,
status=status,
search_key=search_key,
)
@@ -1800,15 +1808,29 @@ async def get_workflows(
page_size: int = Query(10, ge=1),
only_saved_tasks: bool = Query(False),
only_workflows: bool = Query(False),
title: str = Query(""),
search_key: str | None = Query(
None,
description="Unified search across workflow title and parameter metadata (key, description, default_value).",
),
title: str = Query("", deprecated=True, description="Deprecated: use search_key instead."),
current_org: Organization = Depends(org_auth_service.get_current_org),
template: bool = Query(False),
) -> list[Workflow]:
"""
Get all workflows with the latest version for the organization.
Search semantics:
- If `search_key` is provided, its value is used as a unified search term for both
`workflows.title` and workflow parameter metadata (key, description, and default_value for
`WorkflowParameterModel`).
- Falls back to deprecated `title` (title-only search) if `search_key` is not provided.
- Parameter metadata search excludes soft-deleted parameter rows across all parameter tables.
"""
analytics.capture("skyvern-oss-agent-workflows-get")
# Determine the effective search term: prioritize search_key, fallback to title
effective_search = search_key or (title if title else None)
if template:
global_workflows_permanent_ids = await app.STORAGE.retrieve_global_workflows()
if not global_workflows_permanent_ids:
@@ -1817,7 +1839,7 @@ async def get_workflows(
workflow_permanent_ids=global_workflows_permanent_ids,
page=page,
page_size=page_size,
title=title,
search_key=effective_search or "",
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
)
return workflows
@@ -1834,7 +1856,7 @@ async def get_workflows(
page_size=page_size,
only_saved_tasks=only_saved_tasks,
only_workflows=only_workflows,
title=title,
search_key=effective_search,
statuses=[WorkflowStatus.published, WorkflowStatus.draft],
)

View File

@@ -1137,7 +1137,7 @@ class WorkflowService:
organization_id: str | None = None,
page: int = 1,
page_size: int = 10,
title: str = "",
search_key: str = "",
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]:
return await app.DATABASE.get_workflows_by_permanent_ids(
@@ -1145,7 +1145,7 @@ class WorkflowService:
organization_id=organization_id,
page=page,
page_size=page_size,
title=title,
title=search_key,
statuses=statuses,
)
@@ -1156,11 +1156,14 @@ class WorkflowService:
page_size: int = 10,
only_saved_tasks: bool = False,
only_workflows: bool = False,
title: str = "",
search_key: str | None = None,
statuses: list[WorkflowStatus] | None = None,
) -> list[Workflow]:
"""
Get all workflows with the latest version for the organization.
Args:
search_key: Unified search term for title and parameter metadata (replaces title/parameter).
"""
return await app.DATABASE.get_workflows_by_organization_id(
organization_id=organization_id,
@@ -1168,7 +1171,7 @@ class WorkflowService:
page_size=page_size,
only_saved_tasks=only_saved_tasks,
only_workflows=only_workflows,
title=title,
search_key=search_key,
statuses=statuses,
)
@@ -1327,6 +1330,7 @@ class WorkflowService:
page: int = 1,
page_size: int = 10,
status: list[WorkflowRunStatus] | None = None,
search_key: str | None = None,
) -> list[WorkflowRun]:
return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id(
workflow_permanent_id=workflow_permanent_id,
@@ -1334,6 +1338,7 @@ class WorkflowService:
page=page,
page_size=page_size,
status=status,
search_key=search_key,
)
async def create_workflow_run(