Add error_code and search_key filters to workflow runs API (#SKY-7884) (#4694)
Co-authored-by: Suchintan Singh <suchintan@skyvern.com>
This commit is contained in:
@@ -3,7 +3,24 @@ from datetime import datetime, timedelta
|
||||
from typing import Any, List, Literal, Sequence, overload
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import Text, and_, asc, case, delete, distinct, exists, func, or_, pool, select, tuple_, update
|
||||
from sqlalchemy import (
|
||||
Text,
|
||||
and_,
|
||||
asc,
|
||||
case,
|
||||
cast,
|
||||
delete,
|
||||
distinct,
|
||||
exists,
|
||||
func,
|
||||
literal,
|
||||
or_,
|
||||
pool,
|
||||
select,
|
||||
tuple_,
|
||||
update,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.exc import (
|
||||
SQLAlchemyError,
|
||||
)
|
||||
@@ -3011,6 +3028,57 @@ class AgentDB(BaseAlchemyDB):
|
||||
LOG.error("SQLAlchemyError", exc_info=True)
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def _apply_search_key_filter(query, search_key: str | None): # type: ignore[no-untyped-def]
|
||||
if not search_key:
|
||||
return query
|
||||
key_like = f"%{search_key}%"
|
||||
# Match workflow_run_id directly
|
||||
id_matches = WorkflowRunModel.workflow_run_id.ilike(key_like)
|
||||
# Match parameter key or description (only for non-deleted parameter definitions)
|
||||
# Use EXISTS to avoid duplicate rows and to keep pagination correct
|
||||
param_key_desc_exists = exists(
|
||||
select(1)
|
||||
.select_from(WorkflowRunParameterModel)
|
||||
.join(
|
||||
WorkflowParameterModel,
|
||||
WorkflowParameterModel.workflow_parameter_id == WorkflowRunParameterModel.workflow_parameter_id,
|
||||
)
|
||||
.where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
|
||||
.where(WorkflowParameterModel.deleted_at.is_(None))
|
||||
.where(
|
||||
or_(
|
||||
WorkflowParameterModel.key.ilike(key_like),
|
||||
WorkflowParameterModel.description.ilike(key_like),
|
||||
)
|
||||
)
|
||||
)
|
||||
# Match run parameter value directly (searches all values regardless of parameter definition status)
|
||||
param_value_exists = exists(
|
||||
select(1)
|
||||
.select_from(WorkflowRunParameterModel)
|
||||
.where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
|
||||
.where(WorkflowRunParameterModel.value.ilike(key_like))
|
||||
)
|
||||
# Match extra HTTP headers (cast JSON to text for search, skip NULLs)
|
||||
extra_headers_match = and_(
|
||||
WorkflowRunModel.extra_http_headers.isnot(None),
|
||||
func.cast(WorkflowRunModel.extra_http_headers, Text()).ilike(key_like),
|
||||
)
|
||||
return query.where(or_(id_matches, param_key_desc_exists, param_value_exists, extra_headers_match))
|
||||
|
||||
@staticmethod
|
||||
def _apply_error_code_filter(query, error_code: str | None): # type: ignore[no-untyped-def]
|
||||
if not error_code:
|
||||
return query
|
||||
error_code_exists = exists(
|
||||
select(1)
|
||||
.select_from(TaskModel)
|
||||
.where(TaskModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
|
||||
.where(cast(TaskModel.errors, JSONB).contains(literal([{"error_code": error_code}], type_=JSONB)))
|
||||
)
|
||||
return query.where(error_code_exists)
|
||||
|
||||
async def get_workflow_runs(
|
||||
self,
|
||||
organization_id: str,
|
||||
@@ -3018,6 +3086,8 @@ class AgentDB(BaseAlchemyDB):
|
||||
page_size: int = 10,
|
||||
status: list[WorkflowRunStatus] | None = None,
|
||||
ordering: tuple[str, str] | None = None,
|
||||
search_key: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[WorkflowRun]:
|
||||
try:
|
||||
async with self.Session() as session:
|
||||
@@ -3030,6 +3100,9 @@ class AgentDB(BaseAlchemyDB):
|
||||
.filter(WorkflowRunModel.parent_workflow_run_id.is_(None))
|
||||
)
|
||||
|
||||
query = self._apply_search_key_filter(query, search_key)
|
||||
query = self._apply_error_code_filter(query, error_code)
|
||||
|
||||
if status:
|
||||
query = query.filter(WorkflowRunModel.status.in_(status))
|
||||
|
||||
@@ -3092,6 +3165,7 @@ class AgentDB(BaseAlchemyDB):
|
||||
page_size: int = 10,
|
||||
status: list[WorkflowRunStatus] | None = None,
|
||||
search_key: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[WorkflowRun]:
|
||||
"""
|
||||
Get runs for a workflow, with optional `search_key` on run ID, parameter key/description/value,
|
||||
@@ -3106,42 +3180,8 @@ class AgentDB(BaseAlchemyDB):
|
||||
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
|
||||
.filter(WorkflowRunModel.organization_id == organization_id)
|
||||
)
|
||||
if search_key:
|
||||
key_like = f"%{search_key}%"
|
||||
# Match workflow_run_id directly
|
||||
id_matches = WorkflowRunModel.workflow_run_id.ilike(key_like)
|
||||
# Match parameter key or description (only for non-deleted parameter definitions)
|
||||
# Use EXISTS to avoid duplicate rows and to keep pagination correct
|
||||
param_key_desc_exists = exists(
|
||||
select(1)
|
||||
.select_from(WorkflowRunParameterModel)
|
||||
.join(
|
||||
WorkflowParameterModel,
|
||||
WorkflowParameterModel.workflow_parameter_id
|
||||
== WorkflowRunParameterModel.workflow_parameter_id,
|
||||
)
|
||||
.where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
|
||||
.where(WorkflowParameterModel.deleted_at.is_(None))
|
||||
.where(
|
||||
or_(
|
||||
WorkflowParameterModel.key.ilike(key_like),
|
||||
WorkflowParameterModel.description.ilike(key_like),
|
||||
)
|
||||
)
|
||||
)
|
||||
# Match run parameter value directly (searches all values regardless of parameter definition status)
|
||||
param_value_exists = exists(
|
||||
select(1)
|
||||
.select_from(WorkflowRunParameterModel)
|
||||
.where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id)
|
||||
.where(WorkflowRunParameterModel.value.ilike(key_like))
|
||||
)
|
||||
# Match extra HTTP headers (cast JSON to text for search, skip NULLs)
|
||||
extra_headers_match = and_(
|
||||
WorkflowRunModel.extra_http_headers.isnot(None),
|
||||
func.cast(WorkflowRunModel.extra_http_headers, Text()).ilike(key_like),
|
||||
)
|
||||
query = query.where(or_(id_matches, param_key_desc_exists, param_value_exists, extra_headers_match))
|
||||
query = self._apply_search_key_filter(query, search_key)
|
||||
query = self._apply_error_code_filter(query, error_code)
|
||||
if status:
|
||||
query = query.filter(WorkflowRunModel.status.in_(status))
|
||||
query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
|
||||
|
||||
@@ -2217,14 +2217,41 @@ async def get_workflow_runs(
|
||||
page: int = Query(1, ge=1),
|
||||
page_size: int = Query(10, ge=1),
|
||||
status: Annotated[list[WorkflowRunStatus] | None, Query()] = None,
|
||||
search_key: str | None = Query(
|
||||
None,
|
||||
max_length=500,
|
||||
description="Search runs by run ID, parameter key, parameter description, run parameter value, or extra HTTP headers.",
|
||||
examples=["login_url", "credential_value"],
|
||||
),
|
||||
error_code: str | None = Query(
|
||||
None,
|
||||
max_length=500,
|
||||
description="Filter runs by user-defined error code stored in task errors. "
|
||||
"Matches against the error_code field in the task errors JSON array.",
|
||||
examples=["INVALID_CREDENTIALS", "LOGIN_FAILED", "CAPTCHA_DETECTED"],
|
||||
),
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> list[WorkflowRun]:
|
||||
"""
|
||||
Get all workflow runs for the current organization.
|
||||
|
||||
Supports filtering by status, parameter search, and error code. All filters
|
||||
are combined with AND logic.
|
||||
|
||||
**Examples:**
|
||||
- All failed runs: `?status=failed`
|
||||
- Failed runs with a specific error: `?status=failed&error_code=INVALID_CREDENTIALS`
|
||||
- Runs matching a parameter value: `?search_key=https://example.com`
|
||||
- Combined: `?status=failed&error_code=LOGIN_FAILED&search_key=my_credential`
|
||||
"""
|
||||
analytics.capture("skyvern-oss-agent-workflow-runs-get")
|
||||
return await app.WORKFLOW_SERVICE.get_workflow_runs(
|
||||
organization_id=current_org.organization_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
status=status,
|
||||
search_key=search_key,
|
||||
error_code=error_code,
|
||||
)
|
||||
|
||||
|
||||
@@ -2248,12 +2275,29 @@ async def get_workflow_runs_by_id(
|
||||
status: Annotated[list[WorkflowRunStatus] | None, Query()] = None,
|
||||
search_key: str | None = Query(
|
||||
None,
|
||||
max_length=500,
|
||||
description="Search runs by run ID, parameter key, parameter description, run parameter value, or extra HTTP headers.",
|
||||
),
|
||||
error_code: str | None = Query(
|
||||
None,
|
||||
max_length=500,
|
||||
description="Filter runs by user-defined error code stored in task errors. "
|
||||
"Matches against the error_code field in the task errors JSON array.",
|
||||
examples=["INVALID_CREDENTIALS", "LOGIN_FAILED", "CAPTCHA_DETECTED"],
|
||||
),
|
||||
current_org: Organization = Depends(org_auth_service.get_current_org),
|
||||
) -> list[WorkflowRun]:
|
||||
"""
|
||||
Get workflow runs for a specific workflow permanent id.
|
||||
|
||||
Supports filtering by status, parameter search, and error code. All filters
|
||||
are combined with AND logic.
|
||||
|
||||
**Examples:**
|
||||
- All failed runs: `?status=failed`
|
||||
- Failed runs with a specific error: `?status=failed&error_code=INVALID_CREDENTIALS`
|
||||
- Runs matching a parameter value: `?search_key=https://example.com`
|
||||
- Combined: `?status=failed&error_code=LOGIN_FAILED&search_key=my_credential`
|
||||
"""
|
||||
analytics.capture("skyvern-oss-agent-workflow-runs-get")
|
||||
return await app.WORKFLOW_SERVICE.get_workflow_runs_for_workflow_permanent_id(
|
||||
@@ -2263,6 +2307,7 @@ async def get_workflow_runs_by_id(
|
||||
page_size=page_size,
|
||||
status=status,
|
||||
search_key=search_key,
|
||||
error_code=error_code,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2340,6 +2340,8 @@ class WorkflowService:
|
||||
page_size: int = 10,
|
||||
status: list[WorkflowRunStatus] | None = None,
|
||||
ordering: tuple[str, str] | None = None,
|
||||
search_key: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[WorkflowRun]:
|
||||
return await app.DATABASE.get_workflow_runs(
|
||||
organization_id=organization_id,
|
||||
@@ -2347,6 +2349,8 @@ class WorkflowService:
|
||||
page_size=page_size,
|
||||
status=status,
|
||||
ordering=ordering,
|
||||
search_key=search_key,
|
||||
error_code=error_code,
|
||||
)
|
||||
|
||||
async def get_workflow_runs_count(
|
||||
@@ -2367,6 +2371,7 @@ class WorkflowService:
|
||||
page_size: int = 10,
|
||||
status: list[WorkflowRunStatus] | None = None,
|
||||
search_key: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[WorkflowRun]:
|
||||
return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id(
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
@@ -2375,6 +2380,7 @@ class WorkflowService:
|
||||
page_size=page_size,
|
||||
status=status,
|
||||
search_key=search_key,
|
||||
error_code=error_code,
|
||||
)
|
||||
|
||||
async def create_workflow_run(
|
||||
|
||||
Reference in New Issue
Block a user