Add workflow_run_id filter to get tasks endpoint (#571)

This commit is contained in:
Kerem Yilmaz
2024-07-09 11:37:03 -07:00
committed by GitHub
parent 4ff330bb50
commit 030145c585
2 changed files with 13 additions and 2 deletions

View File

@@ -414,12 +414,15 @@ class AgentDB:
page: int = 1, page: int = 1,
page_size: int = 10, page_size: int = 10,
task_status: list[TaskStatus] | None = None, task_status: list[TaskStatus] | None = None,
workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> list[Task]: ) -> list[Task]:
""" """
Get all tasks. Get all tasks.
:param page: Starts at 1 :param page: Starts at 1
:param page_size: :param page_size:
:param task_status:
:param workflow_run_id:
:return: :return:
""" """
if page < 1: if page < 1:
@@ -428,9 +431,11 @@ class AgentDB:
try: try:
async with self.Session() as session: async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based db_page = page - 1 # offset logic is 0 based
query = select(TaskModel).filter_by(organization_id=organization_id) query = select(TaskModel).filter(TaskModel.organization_id == organization_id)
if task_status: if task_status:
query = query.filter(TaskModel.status.in_(task_status)) query = query.filter(TaskModel.status.in_(task_status))
if workflow_run_id:
query = query.filter(TaskModel.workflow_run_id == workflow_run_id)
query = query.order_by(TaskModel.created_at.desc()).limit(page_size).offset(db_page * page_size) query = query.order_by(TaskModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
tasks = (await session.scalars(query)).all() tasks = (await session.scalars(query)).all()
return [convert_to_task(task, debug_enabled=self.debug_enabled) for task in tasks] return [convert_to_task(task, debug_enabled=self.debug_enabled) for task in tasks]

View File

@@ -376,12 +376,15 @@ async def get_agent_tasks(
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1), page_size: int = Query(10, ge=1),
task_status: Annotated[list[TaskStatus] | None, Query()] = None, task_status: Annotated[list[TaskStatus] | None, Query()] = None,
workflow_run_id: Annotated[str | None, Query()] = None,
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
) -> Response: ) -> Response:
""" """
Get all tasks. Get all tasks.
:param page: Starting page, defaults to 1 :param page: Starting page, defaults to 1
:param page_size: Page size, defaults to 10 :param page_size: Page size, defaults to 10
:param task_status: Task status filter
:param workflow_run_id: Workflow run id filter
:return: List of tasks with pagination without steps populated. Steps can be populated by calling the :return: List of tasks with pagination without steps populated. Steps can be populated by calling the
get_agent_task endpoint. get_agent_task endpoint.
""" """
@@ -390,6 +393,7 @@ async def get_agent_tasks(
page, page,
page_size, page_size,
task_status=task_status, task_status=task_status,
workflow_run_id=workflow_run_id,
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
) )
return ORJSONResponse([task.to_task_response().model_dump() for task in tasks]) return ORJSONResponse([task.to_task_response().model_dump() for task in tasks])
@@ -415,7 +419,9 @@ async def get_agent_tasks_internal(
get_agent_task endpoint. get_agent_task endpoint.
""" """
analytics.capture("skyvern-oss-agent-tasks-get-internal") analytics.capture("skyvern-oss-agent-tasks-get-internal")
tasks = await app.DATABASE.get_tasks(page, page_size, organization_id=current_org.organization_id) tasks = await app.DATABASE.get_tasks(
page, page_size, workflow_run_id=None, organization_id=current_org.organization_id
)
return ORJSONResponse([task.model_dump() for task in tasks]) return ORJSONResponse([task.model_dump() for task in tasks])