Add status filter to workflow runs endpoints (#1637)

This commit is contained in:
Shuchang Zheng
2025-01-24 23:31:26 +08:00
committed by GitHub
parent 296abc7aa5
commit 5c37ebbb9e
3 changed files with 41 additions and 24 deletions

View File

@@ -1340,40 +1340,42 @@ class AgentDB:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)
raise raise
async def get_workflow_runs(self, organization_id: str, page: int = 1, page_size: int = 10) -> list[WorkflowRun]: async def get_workflow_runs(
self, organization_id: str, page: int = 1, page_size: int = 10, status: list[WorkflowRunStatus] | None = None
) -> list[WorkflowRun]:
try: try:
async with self.Session() as session: async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based db_page = page - 1 # offset logic is 0 based
workflow_runs = ( query = select(WorkflowRunModel).filter(WorkflowRunModel.organization_id == organization_id)
await session.scalars( if status:
select(WorkflowRunModel) query = query.filter(WorkflowRunModel.status.in_(status))
.filter(WorkflowRunModel.organization_id == organization_id) query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
.order_by(WorkflowRunModel.created_at.desc()) workflow_runs = (await session.scalars(query)).all()
.limit(page_size)
.offset(db_page * page_size)
)
).all()
return [convert_to_workflow_run(run) for run in workflow_runs] return [convert_to_workflow_run(run) for run in workflow_runs]
except SQLAlchemyError: except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)
raise raise
async def get_workflow_runs_for_workflow_permanent_id( async def get_workflow_runs_for_workflow_permanent_id(
self, workflow_permanent_id: str, organization_id: str, page: int = 1, page_size: int = 10 self,
workflow_permanent_id: str,
organization_id: str,
page: int = 1,
page_size: int = 10,
status: list[WorkflowRunStatus] | None = None,
) -> list[WorkflowRun]: ) -> list[WorkflowRun]:
try: try:
async with self.Session() as session: async with self.Session() as session:
db_page = page - 1 # offset logic is 0 based db_page = page - 1 # offset logic is 0 based
workflow_runs = ( query = (
await session.scalars( select(WorkflowRunModel)
select(WorkflowRunModel) .filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id)
.filter(WorkflowRunModel.workflow_permanent_id == workflow_permanent_id) .filter(WorkflowRunModel.organization_id == organization_id)
.filter(WorkflowRunModel.organization_id == organization_id) )
.order_by(WorkflowRunModel.created_at.desc()) if status:
.limit(page_size) query = query.filter(WorkflowRunModel.status.in_(status))
.offset(db_page * page_size) query = query.order_by(WorkflowRunModel.created_at.desc()).limit(page_size).offset(db_page * page_size)
) workflow_runs = (await session.scalars(query)).all()
).all()
return [convert_to_workflow_run(run) for run in workflow_runs] return [convert_to_workflow_run(run) for run in workflow_runs]
except SQLAlchemyError: except SQLAlchemyError:
LOG.error("SQLAlchemyError", exc_info=True) LOG.error("SQLAlchemyError", exc_info=True)

View File

@@ -67,6 +67,7 @@ from skyvern.forge.sdk.workflow.models.workflow import (
Workflow, Workflow,
WorkflowRequestBody, WorkflowRequestBody,
WorkflowRun, WorkflowRun,
WorkflowRunStatus,
WorkflowRunStatusResponse, WorkflowRunStatusResponse,
) )
from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest
@@ -677,6 +678,7 @@ async def execute_workflow(
async def get_workflow_runs( async def get_workflow_runs(
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1), page_size: int = Query(10, ge=1),
status: Annotated[list[WorkflowRunStatus] | None, Query()] = None,
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRun]: ) -> list[WorkflowRun]:
analytics.capture("skyvern-oss-agent-workflow-runs-get") analytics.capture("skyvern-oss-agent-workflow-runs-get")
@@ -684,6 +686,7 @@ async def get_workflow_runs(
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
page=page, page=page,
page_size=page_size, page_size=page_size,
status=status,
) )
@@ -700,6 +703,7 @@ async def get_workflow_runs_for_workflow_permanent_id(
workflow_permanent_id: str, workflow_permanent_id: str,
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1), page_size: int = Query(10, ge=1),
status: Annotated[list[WorkflowRunStatus] | None, Query()] = None,
current_org: Organization = Depends(org_auth_service.get_current_org), current_org: Organization = Depends(org_auth_service.get_current_org),
) -> list[WorkflowRun]: ) -> list[WorkflowRun]:
analytics.capture("skyvern-oss-agent-workflow-runs-get") analytics.capture("skyvern-oss-agent-workflow-runs-get")
@@ -708,6 +712,7 @@ async def get_workflow_runs_for_workflow_permanent_id(
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
page=page, page=page,
page_size=page_size, page_size=page_size,
status=status,
) )

View File

@@ -575,17 +575,27 @@ class WorkflowService:
organization_id=organization_id, organization_id=organization_id,
) )
async def get_workflow_runs(self, organization_id: str, page: int = 1, page_size: int = 10) -> list[WorkflowRun]: async def get_workflow_runs(
return await app.DATABASE.get_workflow_runs(organization_id=organization_id, page=page, page_size=page_size) self, organization_id: str, page: int = 1, page_size: int = 10, status: list[WorkflowRunStatus] | None = None
) -> list[WorkflowRun]:
return await app.DATABASE.get_workflow_runs(
organization_id=organization_id, page=page, page_size=page_size, status=status
)
async def get_workflow_runs_for_workflow_permanent_id( async def get_workflow_runs_for_workflow_permanent_id(
self, workflow_permanent_id: str, organization_id: str, page: int = 1, page_size: int = 10 self,
workflow_permanent_id: str,
organization_id: str,
page: int = 1,
page_size: int = 10,
status: list[WorkflowRunStatus] | None = None,
) -> list[WorkflowRun]: ) -> list[WorkflowRun]:
return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id( return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id(
workflow_permanent_id=workflow_permanent_id, workflow_permanent_id=workflow_permanent_id,
organization_id=organization_id, organization_id=organization_id,
page=page, page=page,
page_size=page_size, page_size=page_size,
status=status,
) )
async def create_workflow_run( async def create_workflow_run(