From 030145c5857f32fbc081f6e26d0d5a4d8d600c0a Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Tue, 9 Jul 2024 11:37:03 -0700 Subject: [PATCH] Add workflow_run_id filter to get tasks endpoint (#571) --- skyvern/forge/sdk/db/client.py | 7 ++++++- skyvern/forge/sdk/routes/agent_protocol.py | 8 +++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 4f82a060..693b1fad 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -414,12 +414,15 @@ class AgentDB: page: int = 1, page_size: int = 10, task_status: list[TaskStatus] | None = None, + workflow_run_id: str | None = None, organization_id: str | None = None, ) -> list[Task]: """ Get all tasks. :param page: Starts at 1 :param page_size: + :param task_status: + :param workflow_run_id: :return: """ if page < 1: @@ -428,9 +431,11 @@ class AgentDB: try: async with self.Session() as session: db_page = page - 1 # offset logic is 0 based - query = select(TaskModel).filter_by(organization_id=organization_id) + query = select(TaskModel).filter(TaskModel.organization_id == organization_id) if task_status: query = query.filter(TaskModel.status.in_(task_status)) + if workflow_run_id: + query = query.filter(TaskModel.workflow_run_id == workflow_run_id) query = query.order_by(TaskModel.created_at.desc()).limit(page_size).offset(db_page * page_size) tasks = (await session.scalars(query)).all() return [convert_to_task(task, debug_enabled=self.debug_enabled) for task in tasks] diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 35cebd20..7851083b 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -376,12 +376,15 @@ async def get_agent_tasks( page: int = Query(1, ge=1), page_size: int = Query(10, ge=1), task_status: Annotated[list[TaskStatus] | None, Query()] = None, + workflow_run_id: Annotated[str | None, Query()] = None, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> Response: """ Get all tasks. :param page: Starting page, defaults to 1 :param page_size: Page size, defaults to 10 + :param task_status: Task status filter + :param workflow_run_id: Workflow run id filter :return: List of tasks with pagination without steps populated. Steps can be populated by calling the get_agent_task endpoint. """ @@ -390,6 +393,7 @@ async def get_agent_tasks( page, page_size, task_status=task_status, + workflow_run_id=workflow_run_id, organization_id=current_org.organization_id, ) return ORJSONResponse([task.to_task_response().model_dump() for task in tasks]) @@ -415,7 +419,9 @@ async def get_agent_tasks_internal( get_agent_task endpoint. """ analytics.capture("skyvern-oss-agent-tasks-get-internal") - tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id) + tasks = await app.DATABASE.get_tasks( + page, page_size, workflow_run_id=None, organization_id=current_org.organization_id + ) return ORJSONResponse([task.model_dump() for task in tasks])