From b1758dd3b5dca9bfc43be98a9de47f269ea95fe3 Mon Sep 17 00:00:00 2001 From: Suchintan Date: Wed, 11 Feb 2026 00:42:11 -0500 Subject: [PATCH] Add error_code and search_key filters to workflow runs API (#SKY-7884) (#4694) Co-authored-by: Suchintan Singh --- skyvern/forge/sdk/db/agent_db.py | 114 ++++++++++++++------- skyvern/forge/sdk/routes/agent_protocol.py | 45 ++++++++ skyvern/forge/sdk/workflow/service.py | 6 ++ 3 files changed, 128 insertions(+), 37 deletions(-) diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 70e4e442..a87f6246 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -3,7 +3,24 @@ from datetime import datetime, timedelta from typing import Any, List, Literal, Sequence, overload import structlog -from sqlalchemy import Text, and_, asc, case, delete, distinct, exists, func, or_, pool, select, tuple_, update +from sqlalchemy import ( + Text, + and_, + asc, + case, + cast, + delete, + distinct, + exists, + func, + literal, + or_, + pool, + select, + tuple_, + update, +) +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.exc import ( SQLAlchemyError, ) @@ -3011,6 +3028,57 @@ class AgentDB(BaseAlchemyDB): LOG.error("SQLAlchemyError", exc_info=True) raise + @staticmethod + def _apply_search_key_filter(query, search_key: str | None): # type: ignore[no-untyped-def] + if not search_key: + return query + key_like = f"%{search_key}%" + # Match workflow_run_id directly + id_matches = WorkflowRunModel.workflow_run_id.ilike(key_like) + # Match parameter key or description (only for non-deleted parameter definitions) + # Use EXISTS to avoid duplicate rows and to keep pagination correct + param_key_desc_exists = exists( + select(1) + .select_from(WorkflowRunParameterModel) + .join( + WorkflowParameterModel, + WorkflowParameterModel.workflow_parameter_id == WorkflowRunParameterModel.workflow_parameter_id, + ) + .where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id) + .where(WorkflowParameterModel.deleted_at.is_(None)) + .where( + or_( + WorkflowParameterModel.key.ilike(key_like), + WorkflowParameterModel.description.ilike(key_like), + ) + ) + ) + # Match run parameter value directly (searches all values regardless of parameter definition status) + param_value_exists = exists( + select(1) + .select_from(WorkflowRunParameterModel) + .where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id) + .where(WorkflowRunParameterModel.value.ilike(key_like)) + ) + # Match extra HTTP headers (cast JSON to text for search, skip NULLs) + extra_headers_match = and_( + WorkflowRunModel.extra_http_headers.isnot(None), + func.cast(WorkflowRunModel.extra_http_headers, Text()).ilike(key_like), + ) + return query.where(or_(id_matches, param_key_desc_exists, param_value_exists, extra_headers_match)) + + @staticmethod + def _apply_error_code_filter(query, error_code: str | None): # type: ignore[no-untyped-def] + if not error_code: + return query + error_code_exists = exists( + select(1) + .select_from(TaskModel) + .where(TaskModel.workflow_run_id == WorkflowRunModel.workflow_run_id) + .where(cast(TaskModel.errors, JSONB).contains(literal([{"error_code": error_code}], type_=JSONB))) + ) + return query.where(error_code_exists) + async def get_workflow_runs( self, organization_id: str, @@ -3018,6 +3086,8 @@ class AgentDB(BaseAlchemyDB): page_size: int = 10, status: list[WorkflowRunStatus] | None = None, ordering: tuple[str, str] | None = None, + search_key: str | None = None, + error_code: str | None = None, ) -> list[WorkflowRun]: try: async with self.Session() as session: @@ -3030,6 +3100,9 @@ class AgentDB(BaseAlchemyDB): .filter(WorkflowRunModel.parent_workflow_run_id.is_(None)) ) + query = self._apply_search_key_filter(query, search_key) + query = self._apply_error_code_filter(query, error_code) + if status: query = query.filter(WorkflowRunModel.status.in_(status)) @@ -3092,6 +3165,7 @@ class AgentDB(BaseAlchemyDB): page_size: int = 10, status: list[WorkflowRunStatus] | None = None, search_key: str | None = None, + error_code: str | None = None, ) -> list[WorkflowRun]: """ Get runs for a workflow, with optional `search_key` on run ID, parameter key/description/value, @@ -3106,42 +3180,8 @@ class AgentDB(BaseAlchemyDB): .filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id) .filter(WorkflowRunModel.organization_id == organization_id) ) - if search_key: - key_like = f"%{search_key}%" - # Match workflow_run_id directly - id_matches = WorkflowRunModel.workflow_run_id.ilike(key_like) - # Match parameter key or description (only for non-deleted parameter definitions) - # Use EXISTS to avoid duplicate rows and to keep pagination correct - param_key_desc_exists = exists( - select(1) - .select_from(WorkflowRunParameterModel) - .join( - WorkflowParameterModel, - WorkflowParameterModel.workflow_parameter_id - == WorkflowRunParameterModel.workflow_parameter_id, - ) - .where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id) - .where(WorkflowParameterModel.deleted_at.is_(None)) - .where( - or_( - WorkflowParameterModel.key.ilike(key_like), - WorkflowParameterModel.description.ilike(key_like), - ) - ) - ) - # Match run parameter value directly (searches all values regardless of parameter definition status) - param_value_exists = exists( - select(1) - .select_from(WorkflowRunParameterModel) - .where(WorkflowRunParameterModel.workflow_run_id == WorkflowRunModel.workflow_run_id) - .where(WorkflowRunParameterModel.value.ilike(key_like)) - ) - # Match extra HTTP headers (cast JSON to text for search, skip NULLs) - extra_headers_match = and_( - WorkflowRunModel.extra_http_headers.isnot(None), - func.cast(WorkflowRunModel.extra_http_headers, Text()).ilike(key_like), - ) - query = query.where(or_(id_matches, param_key_desc_exists, param_value_exists, extra_headers_match)) + query = self._apply_search_key_filter(query, search_key) + query = self._apply_error_code_filter(query, error_code) if status: query = query.filter(WorkflowRunModel.status.in_(status)) query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6d6de069..894ded34 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -2217,14 +2217,41 @@ async def get_workflow_runs( page: int = Query(1, ge=1), page_size: int = Query(10, ge=1), status: Annotated[list[WorkflowRunStatus] | None, Query()] = None, + search_key: str | None = Query( + None, + max_length=500, + description="Search runs by run ID, parameter key, parameter description, run parameter value, or extra HTTP headers.", + examples=["login_url", "credential_value"], + ), + error_code: str | None = Query( + None, + max_length=500, + description="Filter runs by user-defined error code stored in task errors. " + "Matches against the error_code field in the task errors JSON array.", + examples=["INVALID_CREDENTIALS", "LOGIN_FAILED", "CAPTCHA_DETECTED"], + ), current_org: Organization = Depends(org_auth_service.get_current_org), ) -> list[WorkflowRun]: + """ + Get all workflow runs for the current organization. + + Supports filtering by status, parameter search, and error code. All filters + are combined with AND logic. + + **Examples:** + - All failed runs: `?status=failed` + - Failed runs with a specific error: `?status=failed&error_code=INVALID_CREDENTIALS` + - Runs matching a parameter value: `?search_key=https://example.com` + - Combined: `?status=failed&error_code=LOGIN_FAILED&search_key=my_credential` + """ analytics.capture("skyvern-oss-agent-workflow-runs-get") return await app.WORKFLOW_SERVICE.get_workflow_runs( organization_id=current_org.organization_id, page=page, page_size=page_size, status=status, + search_key=search_key, + error_code=error_code, ) @@ -2248,12 +2275,29 @@ async def get_workflow_runs_by_id( status: Annotated[list[WorkflowRunStatus] | None, Query()] = None, search_key: str | None = Query( None, + max_length=500, description="Search runs by run ID, parameter key, parameter description, run parameter value, or extra HTTP headers.", ), + error_code: str | None = Query( + None, + max_length=500, + description="Filter runs by user-defined error code stored in task errors. " + "Matches against the error_code field in the task errors JSON array.", + examples=["INVALID_CREDENTIALS", "LOGIN_FAILED", "CAPTCHA_DETECTED"], + ), current_org: Organization = Depends(org_auth_service.get_current_org), ) -> list[WorkflowRun]: """ Get workflow runs for a specific workflow permanent id. + + Supports filtering by status, parameter search, and error code. All filters + are combined with AND logic. + + **Examples:** + - All failed runs: `?status=failed` + - Failed runs with a specific error: `?status=failed&error_code=INVALID_CREDENTIALS` + - Runs matching a parameter value: `?search_key=https://example.com` + - Combined: `?status=failed&error_code=LOGIN_FAILED&search_key=my_credential` """ analytics.capture("skyvern-oss-agent-workflow-runs-get") return await app.WORKFLOW_SERVICE.get_workflow_runs_for_workflow_permanent_id( @@ -2263,6 +2307,7 @@ async def get_workflow_runs_by_id( page_size=page_size, status=status, search_key=search_key, + error_code=error_code, ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index b2b6973e..7b7961f2 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -2340,6 +2340,8 @@ class WorkflowService: page_size: int = 10, status: list[WorkflowRunStatus] | None = None, ordering: tuple[str, str] | None = None, + search_key: str | None = None, + error_code: str | None = None, ) -> list[WorkflowRun]: return await app.DATABASE.get_workflow_runs( organization_id=organization_id, @@ -2347,6 +2349,8 @@ class WorkflowService: page_size=page_size, status=status, ordering=ordering, + search_key=search_key, + error_code=error_code, ) async def get_workflow_runs_count( @@ -2367,6 +2371,7 @@ class WorkflowService: page_size: int = 10, status: list[WorkflowRunStatus] | None = None, search_key: str | None = None, + error_code: str | None = None, ) -> list[WorkflowRun]: return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id( workflow_permanent_id=workflow_permanent_id, @@ -2375,6 +2380,7 @@ class WorkflowService: page_size=page_size, status=status, search_key=search_key, + error_code=error_code, ) async def create_workflow_run(