From 1e7318d0048a8559399dfe88c6d58216cb06aba8 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sat, 22 Feb 2025 01:36:35 -0800 Subject: [PATCH] task_v2 refactor Part 1 - observer_service -> task_v2_service (#1812) --- skyvern/agent/local.py | 6 +++--- skyvern/forge/sdk/executor/async_executor.py | 4 ++-- skyvern/forge/sdk/routes/agent_protocol.py | 8 ++++---- .../services/{observer_service.py => task_v2_service.py} | 0 skyvern/forge/sdk/workflow/models/block.py | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) rename skyvern/forge/sdk/services/{observer_service.py => task_v2_service.py} (100%) diff --git a/skyvern/agent/local.py b/skyvern/agent/local.py index e62f311c..badb87b8 100644 --- a/skyvern/agent/local.py +++ b/skyvern/agent/local.py @@ -9,7 +9,7 @@ from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.observers import ObserverTask, ObserverTaskRequest, ObserverTaskStatus from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, Task, TaskRequest, TaskResponse, TaskStatus -from skyvern.forge.sdk.services import observer_service +from skyvern.forge.sdk.services import task_v2_service from skyvern.forge.sdk.services.org_auth_token_service import API_KEY_LIFETIME from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus from skyvern.utils import migrate_db @@ -80,7 +80,7 @@ class Agent: status=WorkflowRunStatus.queued, ) - await observer_service.run_observer_task( + await task_v2_service.run_observer_task( organization=organization, observer_cruise_id=observer_task.observer_cruise_id, ) @@ -156,7 +156,7 @@ class Agent: async def observer_task_v_2(self, task_request: ObserverTaskRequest) -> ObserverTask: organization = await self._get_organization() - observer_task = await observer_service.initialize_observer_task( + observer_task = await task_v2_service.initialize_observer_task( organization=organization, user_prompt=task_request.user_prompt, user_url=str(task_request.url) if task_request.url else None, diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index ccd67448..4d848a80 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -9,7 +9,7 @@ from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.schemas.observers import ObserverTaskStatus from skyvern.forge.sdk.schemas.tasks import TaskStatus -from skyvern.forge.sdk.services import observer_service +from skyvern.forge.sdk.services import task_v2_service from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus LOG = structlog.get_logger() @@ -176,7 +176,7 @@ class BackgroundTaskExecutor(AsyncExecutor): if background_tasks: background_tasks.add_task( - observer_service.run_observer_task, + task_v2_service.run_observer_task, organization=organization, observer_cruise_id=observer_cruise_id, max_iterations_override=max_iterations_override, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 24355638..23cc5321 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -58,7 +58,7 @@ from skyvern.forge.sdk.schemas.tasks import ( TaskStatus, ) from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline -from skyvern.forge.sdk.services import observer_service, org_auth_service +from skyvern.forge.sdk.services import org_auth_service, task_v2_service from skyvern.forge.sdk.workflow.exceptions import ( FailedToCreateWorkflow, FailedToUpdateWorkflow, @@ -1234,7 +1234,7 @@ async def observer_task( LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) try: - observer_task = await observer_service.initialize_observer_task( + observer_task = await task_v2_service.initialize_observer_task( organization=organization, user_prompt=data.user_prompt, user_url=str(data.url) if data.url else None, @@ -1268,7 +1268,7 @@ async def get_observer_task( task_id: str, organization: Organization = Depends(org_auth_service.get_current_org), ) -> dict[str, Any]: - observer_task = await observer_service.get_observer_cruise(task_id, organization.organization_id) + observer_task = await task_v2_service.get_observer_cruise(task_id, organization.organization_id) if not observer_task: raise HTTPException(status_code=404, detail=f"Observer task {task_id} not found") return observer_task.model_dump(by_alias=True) @@ -1408,7 +1408,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: final_workflow_run_block_timeline.extend(workflow_blocks) if observer_task_obj and observer_task_obj.observer_cruise_id: - observer_thought_timeline = await observer_service.get_observer_thought_timelines( + observer_thought_timeline = await task_v2_service.get_observer_thought_timelines( observer_cruise_id=observer_task_obj.observer_cruise_id, organization_id=organization_id, ) diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/task_v2_service.py similarity index 100% rename from skyvern/forge/sdk/services/observer_service.py rename to skyvern/forge/sdk/services/task_v2_service.py diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e8c81ef7..7ddb4589 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -2138,7 +2138,7 @@ class TaskV2Block(Block): browser_session_id: str | None = None, **kwargs: dict, ) -> BlockResult: - from skyvern.forge.sdk.services import observer_service + from skyvern.forge.sdk.services import task_v2_service from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus if not organization_id: @@ -2150,7 +2150,7 @@ class TaskV2Block(Block): workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id, organization_id) if not workflow_run: raise ValueError(f"WorkflowRun not found {workflow_run_id} when running TaskV2Block") - observer_task = await observer_service.initialize_observer_task( + observer_task = await task_v2_service.initialize_observer_task( organization, user_prompt=self.prompt, user_url=self.url, @@ -2171,7 +2171,7 @@ class TaskV2Block(Block): block_workflow_run_id=observer_task.workflow_run_id, ) - observer_task = await observer_service.run_observer_task( + observer_task = await task_v2_service.run_observer_task( organization=organization, observer_cruise_id=observer_task.observer_cruise_id, request_id=None,