From f20bf94696adfcab5b84cd8ddf19f41dde92224f Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Wed, 12 Mar 2025 17:13:55 -0700 Subject: [PATCH] update api endpoint function names & deprecate endpoints that are not used (#1931) --- skyvern/forge/sdk/routes/agent_protocol.py | 235 ++++----------------- 1 file changed, 42 insertions(+), 193 deletions(-) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 472559ad..c42d91f8 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -25,7 +25,6 @@ from sqlalchemy.exc import OperationalError from skyvern import analytics from skyvern.config import settings -from skyvern.exceptions import StepNotFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import aws_client @@ -85,6 +84,27 @@ v2_router = APIRouter() LOG = structlog.get_logger() +class EntityType(str, Enum): + STEP = "step" + TASK = "task" + WORKFLOW_RUN = "workflow_run" + WORKFLOW_RUN_BLOCK = "workflow_run_block" + THOUGHT = "thought" + + +entity_type_to_param = { + EntityType.STEP: "step_id", + EntityType.TASK: "task_id", + EntityType.WORKFLOW_RUN: "workflow_run_id", + EntityType.WORKFLOW_RUN_BLOCK: "workflow_run_block_id", + EntityType.THOUGHT: "thought_id", +} + + +class AISuggestionType(str, Enum): + DATA_SCHEMA = "data_schema" + + @base_router.post("/webhook", tags=["server"]) @base_router.post("/webhook/", tags=["server"], include_in_schema=False) async def webhook( @@ -125,7 +145,7 @@ async def webhook( @base_router.get("/heartbeat", tags=["server"]) @base_router.get("/heartbeat/", tags=["server"], include_in_schema=False) -async def check_server_status() -> Response: +async def heartbeat() -> Response: """ Check if the server is running. """ @@ -139,7 +159,7 @@ async def check_server_status() -> Response: response_model=CreateTaskResponse, include_in_schema=False, ) -async def create_agent_task( +async def run_task( request: Request, background_tasks: BackgroundTasks, task: TaskRequest, @@ -179,89 +199,6 @@ async def create_agent_task( return CreateTaskResponse(task_id=created_task.task_id) -@base_router.post( - "/tasks/{task_id}/steps/{step_id}", - tags=["agent"], - response_model=Step, - summary="Executes a specific step", -) -@base_router.post( - "/tasks/{task_id}/steps/{step_id}/", - tags=["agent"], - response_model=Step, - summary="Executes a specific step", - include_in_schema=False, -) -@base_router.post( - "/tasks/{task_id}/steps", - tags=["agent"], - response_model=Step, - summary="Executes the next step", -) -@base_router.post( - "/tasks/{task_id}/steps/", - tags=["agent"], - response_model=Step, - summary="Executes the next step", - include_in_schema=False, -) -async def execute_agent_task_step( - task_id: str, - step_id: str | None = None, - current_org: Organization = Depends(org_auth_service.get_current_org), -) -> Response: - analytics.capture("skyvern-oss-agent-task-step-execute") - task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"No task found with id {task_id}", - ) - # An empty step request means that the agent should execute the next step for the task. - if not step_id: - step = await app.DATABASE.get_latest_step(task_id=task_id, organization_id=current_org.organization_id) - if not step: - raise StepNotFound(current_org.organization_id, task_id) - LOG.info( - "Executing latest step since no step_id was provided", - task_id=task_id, - step_id=step.step_id, - step_order=step.order, - step_retry=step.retry_index, - ) - if not step: - LOG.error( - "No steps found for task", - task_id=task_id, - ) - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"No steps found for task {task_id}", - ) - else: - step = await app.DATABASE.get_step(task_id, step_id, organization_id=current_org.organization_id) - if not step: - raise StepNotFound(current_org.organization_id, task_id, step_id) - LOG.info( - "Executing step", - task_id=task_id, - step_id=step.step_id, - step_order=step.order, - step_retry=step.retry_index, - ) - if not step: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"No step found with id {step_id}", - ) - step, _, _ = await app.agent.execute_step(current_org, task, step) - return Response( - content=step.model_dump_json(exclude_none=True) if step else "", - status_code=200, - media_type="application/json", - ) - - @base_router.get("/tasks/{task_id}", response_model=TaskResponse) @base_router.get("/tasks/{task_id}/", response_model=TaskResponse, include_in_schema=False) async def get_task( @@ -391,32 +328,9 @@ async def retry_webhook( return await app.agent.build_task_response(task=task_obj, last_step=latest_step) -@base_router.get("/internal/tasks/{task_id}", response_model=list[Task]) -@base_router.get("/internal/tasks/{task_id}/", response_model=list[Task], include_in_schema=False) -async def get_task_internal( - task_id: str, - current_org: Organization = Depends(org_auth_service.get_current_org), -) -> Response: - """ - Get all tasks. - :param page: Starting page, defaults to 1 - :param page_size: - :return: List of tasks with pagination without steps populated. Steps can be populated by calling the - get_agent_task endpoint. - """ - analytics.capture("skyvern-oss-agent-task-get-internal") - task = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id) - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Task not found {task_id}", - ) - return ORJSONResponse(task.model_dump()) - - @base_router.get("/tasks", tags=["agent"], response_model=list[Task]) @base_router.get("/tasks/", tags=["agent"], response_model=list[Task], include_in_schema=False) -async def get_agent_tasks( +async def get_tasks( page: int = Query(1, ge=1), page_size: int = Query(10, ge=1), task_status: Annotated[list[TaskStatus] | None, Query()] = None, @@ -477,32 +391,6 @@ async def get_runs( return ORJSONResponse([run.model_dump() for run in runs]) -@base_router.get("/internal/tasks", tags=["agent"], response_model=list[Task]) -@base_router.get( - "/internal/tasks/", - tags=["agent"], - response_model=list[Task], - include_in_schema=False, -) -async def get_agent_tasks_internal( - page: int = Query(1, ge=1), - page_size: int = Query(10, ge=1), - current_org: Organization = Depends(org_auth_service.get_current_org), -) -> Response: - """ - Get all tasks. - :param page: Starting page, defaults to 1 - :param page_size: Page size, defaults to 10 - :return: List of tasks with pagination without steps populated. Steps can be populated by calling the - get_agent_task endpoint. - """ - analytics.capture("skyvern-oss-agent-tasks-get-internal") - tasks = await app.DATABASE.get_tasks( - page, page_size, workflow_run_id=None, organization_id=current_org.organization_id - ) - return ORJSONResponse([task.model_dump() for task in tasks]) - - @base_router.get("/tasks/{task_id}/steps", tags=["agent"], response_model=list[Step]) @base_router.get( "/tasks/{task_id}/steps/", @@ -510,7 +398,7 @@ async def get_agent_tasks_internal( response_model=list[Step], include_in_schema=False, ) -async def get_agent_task_steps( +async def get_steps( task_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> Response: @@ -524,23 +412,6 @@ async def get_agent_task_steps( return ORJSONResponse([step.model_dump(exclude_none=True) for step in steps]) -class EntityType(str, Enum): - STEP = "step" - TASK = "task" - WORKFLOW_RUN = "workflow_run" - WORKFLOW_RUN_BLOCK = "workflow_run_block" - THOUGHT = "thought" - - -entity_type_to_param = { - EntityType.STEP: "step_id", - EntityType.TASK: "task_id", - EntityType.WORKFLOW_RUN: "workflow_run_id", - EntityType.WORKFLOW_RUN_BLOCK: "workflow_run_block_id", - EntityType.THOUGHT: "thought_id", -} - - @base_router.get( "/{entity_type}/{entity_id}/artifacts", tags=["agent"], @@ -552,7 +423,7 @@ entity_type_to_param = { response_model=list[Artifact], include_in_schema=False, ) -async def get_agent_entity_artifacts( +async def get_artifacts( entity_type: EntityType, entity_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), @@ -613,7 +484,7 @@ async def get_agent_entity_artifacts( response_model=list[Artifact], include_in_schema=False, ) -async def get_agent_task_step_artifacts( +async def get_step_artifacts( task_id: str, step_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), @@ -657,7 +528,7 @@ class ActionResultTmp(BaseModel): response_model=list[Action], include_in_schema=False, ) -async def get_task_actions( +async def get_actions( task_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> list[Action]: @@ -672,10 +543,10 @@ async def get_task_actions( response_model=RunWorkflowResponse, include_in_schema=False, ) -async def execute_workflow( +async def run_workflow( request: Request, background_tasks: BackgroundTasks, - workflow_id: str, # this is the workflow_permanent_id + workflow_id: str, # this is the workflow_permanent_id internally workflow_request: WorkflowRequestBody, version: int | None = None, current_org: Organization = Depends(org_auth_service.get_current_org), @@ -754,16 +625,16 @@ async def get_workflow_runs( @base_router.get( - "/workflows/{workflow_permanent_id}/runs", + "/workflows/{workflow_id}/runs", response_model=list[WorkflowRun], ) @base_router.get( - "/workflows/{workflow_permanent_id}/runs/", + "/workflows/{workflow_id}/runs/", response_model=list[WorkflowRun], include_in_schema=False, ) -async def get_workflow_runs_for_workflow_permanent_id( - workflow_permanent_id: str, +async def get_workflow_runs_by_id( + workflow_id: str, page: int = Query(1, ge=1), page_size: int = Query(10, ge=1), status: Annotated[list[WorkflowRunStatus] | None, Query()] = None, @@ -771,7 +642,7 @@ async def get_workflow_runs_for_workflow_permanent_id( ) -> list[WorkflowRun]: analytics.capture("skyvern-oss-agent-workflow-runs-get") return await app.WORKFLOW_SERVICE.get_workflow_runs_for_workflow_permanent_id( - workflow_permanent_id=workflow_permanent_id, + workflow_permanent_id=workflow_id, organization_id=current_org.organization_id, page=page, page_size=page_size, @@ -1032,13 +903,9 @@ async def get_workflow( ) -class AISuggestionType(str, Enum): - DATA_SCHEMA = "data_schema" - - @base_router.post("/suggest/{ai_suggestion_type}", include_in_schema=False) @base_router.post("/suggest/{ai_suggestion_type}/") -async def make_ai_suggestion( +async def suggest( ai_suggestion_type: AISuggestionType, data: AISuggestionRequest, current_org: Organization = Depends(org_auth_service.get_current_org), @@ -1148,8 +1015,8 @@ async def get_organizations( @base_router.get("/organizations/{organization_id}/apikeys/", include_in_schema=False) -@base_router.get("/organizations/{organization_id}/apikeys") -async def get_org_api_keys( +@base_router.get("/organizations/{organization_id}/apikeys", include_in_schema=False) +async def get_api_keys( organization_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> GetOrganizationAPIKeysResponse: @@ -1162,7 +1029,7 @@ async def get_org_api_keys( return GetOrganizationAPIKeysResponse(api_keys=api_keys) -async def validate_file_size(file: UploadFile) -> UploadFile: +async def _validate_file_size(file: UploadFile) -> UploadFile: try: file.file.seek(0, 2) # Move the pointer to the end of the file size = file.file.tell() # Get the current position of the pointer, which represents the file size @@ -1181,7 +1048,7 @@ async def validate_file_size(file: UploadFile) -> UploadFile: @base_router.post("/upload_file/", include_in_schema=False) @base_router.post("/upload_file") async def upload_file( - file: UploadFile = Depends(validate_file_size), + file: UploadFile = Depends(_validate_file_size), current_org: Organization = Depends(org_auth_service.get_current_org), ) -> Response: bucket = app.SETTINGS_MANAGER.AWS_S3_BUCKET_UPLOADS @@ -1223,7 +1090,7 @@ async def upload_file( @v2_router.post("/tasks") @v2_router.post("/tasks/", include_in_schema=False) -async def create_task_v2( +async def run_task_v2( request: Request, background_tasks: BackgroundTasks, data: TaskV2Request, @@ -1288,7 +1155,7 @@ async def get_task_v2( response_model=BrowserSessionResponse, include_in_schema=False, ) -async def get_browser_session_by_id( +async def get_browser_session( browser_session_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), ) -> BrowserSessionResponse: @@ -1336,24 +1203,6 @@ async def create_browser_session( return BrowserSessionResponse.from_browser_session(browser_session) -@base_router.post( - "/browser_sessions/close", -) -@base_router.post( - "/browser_sessions/close/", - include_in_schema=False, -) -async def close_browser_sessions( - current_org: Organization = Depends(org_auth_service.get_current_org), -) -> ORJSONResponse: - await app.PERSISTENT_SESSIONS_MANAGER.close_all_sessions(current_org.organization_id) - return ORJSONResponse( - content={"message": "All browser sessions closed"}, - status_code=200, - media_type="application/json", - ) - - @base_router.post( "/browser_sessions/{session_id}/close", )