From 8be94d7928996d661c587d3e408fc2b6ca89ee75 Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Fri, 5 Jul 2024 16:39:42 -0700 Subject: [PATCH] Implement get workflow runs endpoint (#558) --- skyvern/forge/sdk/db/client.py | 34 +++++++++++++++- skyvern/forge/sdk/routes/agent_protocol.py | 47 ++++++++++++++++++++++ skyvern/forge/sdk/workflow/service.py | 16 ++++++-- 3 files changed, 92 insertions(+), 5 deletions(-) diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 60d8e0ec..6f9d2a64 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1019,11 +1019,41 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise - async def get_workflow_runs(self, workflow_id: str) -> list[WorkflowRun]: + async def get_workflow_runs(self, organization_id: str, page: int = 1, page_size: int = 10) -> list[WorkflowRun]: try: async with self.Session() as session: + db_page = page - 1 # offset logic is 0 based workflow_runs = ( - await session.scalars(select(WorkflowRunModel).filter_by(workflow_id=workflow_id)) + await session.scalars( + select(WorkflowRunModel) + .join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id) + .filter(WorkflowModel.organization_id == organization_id) + .order_by(WorkflowRunModel.created_at.desc()) + .limit(page_size) + .offset(db_page * page_size) + ) + ).all() + return [convert_to_workflow_run(run) for run in workflow_runs] + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + + async def get_workflow_runs_for_workflow_permanent_id( + self, workflow_permanent_id: str, organization_id: str, page: int = 1, page_size: int = 10 + ) -> list[WorkflowRun]: + try: + async with self.Session() as session: + db_page = page - 1 # offset logic is 0 based + workflow_runs = ( + await session.scalars( + select(WorkflowRunModel) + .join(WorkflowModel, WorkflowModel.workflow_id == WorkflowRunModel.workflow_id) + .filter(WorkflowModel.workflow_permanent_id == workflow_permanent_id) + .filter(WorkflowModel.organization_id == organization_id) + .order_by(WorkflowRunModel.created_at.desc()) + .limit(page_size) + .offset(db_page * page_size) + ) ).all() return [convert_to_workflow_run(run) for run in workflow_runs] except SQLAlchemyError: diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 427bddda..35cebd20 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -47,6 +47,7 @@ from skyvern.forge.sdk.workflow.models.workflow import ( RunWorkflowResponse, Workflow, WorkflowRequestBody, + WorkflowRun, WorkflowRunStatusResponse, ) from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest @@ -551,6 +552,52 @@ async def execute_workflow( ) +@base_router.get( + "/workflows/runs", + response_model=list[WorkflowRun], +) +@base_router.get( + "/workflows/runs/", + response_model=list[WorkflowRun], + include_in_schema=False, +) +async def get_workflow_runs( + page: int = Query(1, ge=1), + page_size: int = Query(10, ge=1), + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> list[WorkflowRun]: + analytics.capture("skyvern-oss-agent-workflow-runs-get") + return await app.WORKFLOW_SERVICE.get_workflow_runs( + organization_id=current_org.organization_id, + page=page, + page_size=page_size, + ) + + +@base_router.get( + "/workflows/{workflow_permanent_id}/runs", + response_model=list[WorkflowRun], +) +@base_router.get( + "/workflows/{workflow_permanent_id}/runs/", + response_model=list[WorkflowRun], + include_in_schema=False, +) +async def get_workflow_runs_for_workflow_permanent_id( + workflow_permanent_id: str, + page: int = Query(1, ge=1), + page_size: int = Query(10, ge=1), + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> list[WorkflowRun]: + analytics.capture("skyvern-oss-agent-workflow-runs-get") + return await app.WORKFLOW_SERVICE.get_workflow_runs_for_workflow_permanent_id( + workflow_permanent_id=workflow_permanent_id, + organization_id=current_org.organization_id, + page=page, + page_size=page_size, + ) + + @base_router.get( "/workflows/{workflow_id}/runs/{workflow_run_id}", response_model=WorkflowRunStatusResponse, diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 87d145c2..e6f0a01c 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -364,6 +364,19 @@ class WorkflowService: organization_id=organization_id, ) + async def get_workflow_runs(self, organization_id: str, page: int = 1, page_size: int = 10) -> list[WorkflowRun]: + return await app.DATABASE.get_workflow_runs(organization_id=organization_id, page=page, page_size=page_size) + + async def get_workflow_runs_for_workflow_permanent_id( + self, workflow_permanent_id: str, organization_id: str, page: int = 1, page_size: int = 10 + ) -> list[WorkflowRun]: + return await app.DATABASE.get_workflow_runs_for_workflow_permanent_id( + workflow_permanent_id=workflow_permanent_id, + organization_id=organization_id, + page=page, + page_size=page_size, + ) + async def create_workflow_run(self, workflow_request: WorkflowRequestBody, workflow_id: str) -> WorkflowRun: return await app.DATABASE.create_workflow_run( workflow_id=workflow_id, @@ -415,9 +428,6 @@ class WorkflowService: status=WorkflowRunStatus.terminated, ) - async def get_workflow_runs(self, workflow_id: str) -> list[WorkflowRun]: - return await app.DATABASE.get_workflow_runs(workflow_id=workflow_id) - async def get_workflow_run(self, workflow_run_id: str) -> WorkflowRun: workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id) if not workflow_run: