From 14689b53e4f6f24df8268fdba429a1abdfbeb016 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 27 Feb 2025 20:19:02 -0800 Subject: [PATCH] task v2 refactor part 10: ObserverTask -> TaskV2 in backend code (#1839) --- evaluation/core/__init__.py | 8 +- .../script/create_webvoyager_task_v2.py | 16 +- skyvern/agent/local.py | 14 +- .../forge/sdk/api/llm/api_handler_factory.py | 66 +++-- skyvern/forge/sdk/api/llm/models.py | 6 +- skyvern/forge/sdk/artifact/manager.py | 36 +-- skyvern/forge/sdk/artifact/models.py | 2 +- skyvern/forge/sdk/artifact/storage/base.py | 8 +- skyvern/forge/sdk/artifact/storage/local.py | 10 +- skyvern/forge/sdk/artifact/storage/s3.py | 10 +- skyvern/forge/sdk/db/client.py | 146 ++++++----- skyvern/forge/sdk/db/id.py | 6 +- skyvern/forge/sdk/db/models.py | 12 +- skyvern/forge/sdk/executor/async_executor.py | 8 +- skyvern/forge/sdk/log_artifacts.py | 2 + skyvern/forge/sdk/routes/agent_protocol.py | 22 +- skyvern/forge/sdk/schemas/task_v2.py | 20 +- skyvern/forge/sdk/schemas/workflow_runs.py | 4 +- skyvern/forge/sdk/services/task_v2_service.py | 240 +++++++++--------- skyvern/forge/sdk/workflow/models/block.py | 5 +- skyvern/forge/sdk/workflow/models/workflow.py | 4 +- 21 files changed, 313 insertions(+), 332 deletions(-) diff --git a/evaluation/core/__init__.py b/evaluation/core/__init__.py index 0f7874b9..0a22717d 100644 --- a/evaluation/core/__init__.py +++ b/evaluation/core/__init__.py @@ -12,7 +12,7 @@ from pydantic import BaseModel from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import create_folder_if_not_exist -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverTaskRequest +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Request from skyvern.forge.sdk.schemas.tasks import ProxyLocation, TaskRequest, TaskResponse, TaskStatus from skyvern.forge.sdk.workflow.models.workflow import WorkflowRequestBody, WorkflowRunStatus, WorkflowRunStatusResponse @@ -55,12 +55,12 @@ class SkyvernClient: assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}" return response.json()["workflow_run_id"] - def create_task_v2(self, task_v2_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask: + def create_task_v2(self, task_v2_request: TaskV2Request, max_steps: int | None = None) -> TaskV2: url = f"{self.v2_base_url}/tasks" payload, headers = self.generate_curl_params(task_v2_request, max_steps=max_steps) response = requests.post(url, headers=headers, data=payload) assert "task_id" in response.json(), f"Failed to create task v2: {response.text}" - return ObserverTask.model_validate(response.json()) + return TaskV2.model_validate(response.json()) def get_task(self, task_id: str) -> TaskResponse: """Get a task by id.""" @@ -213,7 +213,7 @@ class Evaluator: assert workflow_run_id return workflow_run_id - def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask: + def queue_skyvern_task_v2(self, cruise_request: TaskV2Request, max_step: int | None = None) -> TaskV2: cruise = self.client.create_task_v2(task_v2_request=cruise_request, max_steps=max_step) self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode()) return cruise diff --git a/evaluation/script/create_webvoyager_task_v2.py b/evaluation/script/create_webvoyager_task_v2.py index 955c83e3..1f777dfb 100644 --- a/evaluation/script/create_webvoyager_task_v2.py +++ b/evaluation/script/create_webvoyager_task_v2.py @@ -10,7 +10,7 @@ from evaluation.core import Evaluator, SkyvernClient from evaluation.core.utils import load_webvoyager_case_from_json from skyvern.forge import app from skyvern.forge.prompts import prompt_engine -from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskRequest +from skyvern.forge.sdk.schemas.task_v2 import TaskV2Request load_dotenv() @@ -37,21 +37,21 @@ async def create_task_v2( case_data.question = tweaked_user_goal evaluator = Evaluator(client=client, artifact_folder=f"test/artifacts/{case_data.group_id}/{case_data.id}") - request_body = ObserverTaskRequest( + request_body = TaskV2Request( url=case_data.url, user_prompt=case_data.question, ) - cruise = evaluator.queue_skyvern_cruise(cruise_request=request_body, max_step=case_data.max_steps) + task_v2 = evaluator.queue_skyvern_task_v2(cruise_request=request_body, max_step=case_data.max_steps) dumped_data = case_data.model_dump() dumped_data.update( { - "task_v2_id": cruise.observer_cruise_id, - "workflow_run_id": cruise.workflow_run_id, - "workflow_permanent_id": cruise.workflow_permanent_id, - "cruise_url": str(cruise.url) if cruise.url else cruise.url, + "task_v2_id": task_v2.observer_cruise_id, + "workflow_run_id": task_v2.workflow_run_id, + "workflow_permanent_id": task_v2.workflow_permanent_id, + "cruise_url": str(task_v2.url) if task_v2.url else task_v2.url, } ) - print(f"Queued {cruise.observer_cruise_id} for {case_data.model_dump_json()}") + print(f"Queued {task_v2.observer_cruise_id} for {case_data.model_dump_json()}") f.write(json.dumps(dumped_data) + "\n") cnt += 1 diff --git a/skyvern/agent/local.py b/skyvern/agent/local.py index 7ff6598d..c9e067c6 100644 --- a/skyvern/agent/local.py +++ b/skyvern/agent/local.py @@ -7,7 +7,7 @@ from skyvern.forge.sdk.core import security, skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.organizations import Organization -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverTaskRequest, ObserverTaskStatus +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Request, TaskV2Status from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, Task, TaskRequest, TaskResponse, TaskStatus from skyvern.forge.sdk.services import task_v2_service from skyvern.forge.sdk.services.org_auth_token_service import API_KEY_LIFETIME @@ -66,11 +66,11 @@ class Agent: api_key=org_auth_token.token if org_auth_token else None, ) - async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None: + async def _run_task_v2(self, organization: Organization, task_v2: TaskV2) -> None: # mark task v2 as queued await app.DATABASE.update_task_v2( task_v2_id=task_v2.observer_cruise_id, - status=ObserverTaskStatus.queued, + status=TaskV2Status.queued, organization_id=organization.organization_id, ) @@ -153,7 +153,7 @@ class Agent: return task_response await asyncio.sleep(1) - async def observer_task_v_2(self, task_request: ObserverTaskRequest) -> ObserverTask: + async def observer_task_v_2(self, task_request: TaskV2Request) -> TaskV2: organization = await self._get_organization() task_v2 = await task_v2_service.initialize_task_v2( @@ -173,13 +173,11 @@ class Agent: asyncio.create_task(self._run_task_v2(organization, task_v2)) return task_v2 - async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None: + async def get_observer_task_v_2(self, task_id: str) -> TaskV2 | None: organization = await self._get_organization() return await app.DATABASE.get_task_v2(task_id, organization.organization_id) - async def run_observer_task_v_2( - self, task_request: ObserverTaskRequest, timeout_seconds: int = 600 - ) -> ObserverTask: + async def run_observer_task_v_2(self, task_request: TaskV2Request, timeout_seconds: int = 600) -> TaskV2: task_v2 = await self.observer_task_v_2(task_request) async with asyncio.timeout(timeout_seconds): diff --git a/skyvern/forge/sdk/api/llm/api_handler_factory.py b/skyvern/forge/sdk/api/llm/api_handler_factory.py index d2eef027..eb3781be 100644 --- a/skyvern/forge/sdk/api/llm/api_handler_factory.py +++ b/skyvern/forge/sdk/api/llm/api_handler_factory.py @@ -23,7 +23,7 @@ from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought LOG = structlog.get_logger() @@ -63,8 +63,8 @@ class LLMAPIHandlerFactory: prompt: str, prompt_name: str, step: Step | None = None, - task_v2: ObserverTask | None = None, - observer_thought: ObserverThought | None = None, + task_v2: TaskV2 | None = None, + thought: Thought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, parameters: dict[str, Any] | None = None, @@ -93,7 +93,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.HASHED_HREF_MAP, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -103,7 +103,7 @@ class LLMAPIHandlerFactory: screenshots=screenshots, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ) messages = await llm_messages_builder(prompt, screenshots, llm_config.add_assistant_prefix) @@ -118,7 +118,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_REQUEST, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) try: @@ -145,10 +145,10 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) - if step or observer_thought: + if step or thought: try: llm_cost = litellm.completion_cost(completion_response=response) except Exception as e: @@ -171,10 +171,10 @@ class LLMAPIHandlerFactory: incremental_input_tokens=prompt_tokens if prompt_tokens > 0 else None, incremental_output_tokens=completion_tokens if completion_tokens > 0 else None, ) - if observer_thought: - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, - organization_id=observer_thought.organization_id, + if thought: + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, + organization_id=thought.organization_id, input_token_count=prompt_tokens if prompt_tokens > 0 else None, output_token_count=completion_tokens if completion_tokens > 0 else None, thought_cost=llm_cost, @@ -185,7 +185,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE_PARSED, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -198,7 +198,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -211,10 +211,8 @@ class LLMAPIHandlerFactory: prompt_name=prompt_name, duration_seconds=duration_seconds, step_id=step.step_id if step else None, - observer_thought_id=observer_thought.observer_thought_id if observer_thought else None, - organization_id=step.organization_id - if step - else (observer_thought.organization_id if observer_thought else None), + thought_id=thought.observer_thought_id if thought else None, + organization_id=step.organization_id if step else (thought.organization_id if thought else None), ) return parsed_response @@ -234,8 +232,8 @@ class LLMAPIHandlerFactory: prompt: str, prompt_name: str, step: Step | None = None, - task_v2: ObserverTask | None = None, - observer_thought: ObserverThought | None = None, + task_v2: TaskV2 | None = None, + thought: Thought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, parameters: dict[str, Any] | None = None, @@ -256,7 +254,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.HASHED_HREF_MAP, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -266,7 +264,7 @@ class LLMAPIHandlerFactory: screenshots=screenshots, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -286,7 +284,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_REQUEST, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) t_llm_request = time.perf_counter() @@ -320,11 +318,11 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) - if step or observer_thought: + if step or thought: try: llm_cost = litellm.completion_cost(completion_response=response) except Exception as e: @@ -341,10 +339,10 @@ class LLMAPIHandlerFactory: incremental_input_tokens=prompt_tokens if prompt_tokens > 0 else None, incremental_output_tokens=completion_tokens if completion_tokens > 0 else None, ) - if observer_thought: - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, - organization_id=observer_thought.organization_id, + if thought: + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, + organization_id=thought.organization_id, input_token_count=prompt_tokens if prompt_tokens > 0 else None, output_token_count=completion_tokens if completion_tokens > 0 else None, thought_cost=llm_cost, @@ -355,7 +353,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE_PARSED, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -368,7 +366,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, step=step, task_v2=task_v2, - observer_thought=observer_thought, + thought=thought, ai_suggestion=ai_suggestion, ) @@ -381,10 +379,8 @@ class LLMAPIHandlerFactory: model=llm_config.model_name, duration_seconds=duration_seconds, step_id=step.step_id if step else None, - observer_thought_id=observer_thought.observer_thought_id if observer_thought else None, - organization_id=step.organization_id - if step - else (observer_thought.organization_id if observer_thought else None), + thought_id=thought.observer_thought_id if thought else None, + organization_id=step.organization_id if step else (thought.organization_id if thought else None), ) return parsed_response diff --git a/skyvern/forge/sdk/api/llm/models.py b/skyvern/forge/sdk/api/llm/models.py index 4d3a50f9..cf4b7d1f 100644 --- a/skyvern/forge/sdk/api/llm/models.py +++ b/skyvern/forge/sdk/api/llm/models.py @@ -5,7 +5,7 @@ from litellm import AllowedFailsPolicy from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought from skyvern.forge.sdk.settings_manager import SettingsManager @@ -85,8 +85,8 @@ class LLMAPIHandler(Protocol): prompt: str, prompt_name: str, step: Step | None = None, - task_v2: ObserverTask | None = None, - observer_thought: ObserverThought | None = None, + task_v2: TaskV2 | None = None, + thought: Thought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, parameters: dict[str, Any] | None = None, diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index af26798f..0832ce30 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -9,7 +9,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT from skyvern.forge.sdk.db.id import generate_artifact_id from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger(__name__) @@ -29,7 +29,7 @@ class ArtifactManager: task_id: str | None = None, workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, - observer_thought_id: str | None = None, + thought_id: str | None = None, task_v2_id: str | None = None, ai_suggestion_id: str | None = None, organization_id: str | None = None, @@ -48,7 +48,7 @@ class ArtifactManager: task_id=task_id, workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, - observer_thought_id=observer_thought_id, + thought_id=thought_id, task_v2_id=task_v2_id, organization_id=organization_id, ai_suggestion_id=ai_suggestion_id, @@ -114,30 +114,30 @@ class ArtifactManager: path=path, ) - async def create_observer_thought_artifact( + async def create_thought_artifact( self, - observer_thought: ObserverThought, + thought: Thought, artifact_type: ArtifactType, data: bytes | None = None, path: str | None = None, ) -> str: artifact_id = generate_artifact_id() - uri = app.STORAGE.build_observer_thought_uri(artifact_id, observer_thought, artifact_type) + uri = app.STORAGE.build_thought_uri(artifact_id, thought, artifact_type) return await self._create_artifact( - aio_task_primary_key=observer_thought.observer_cruise_id, + aio_task_primary_key=thought.observer_cruise_id, artifact_id=artifact_id, artifact_type=artifact_type, uri=uri, - observer_thought_id=observer_thought.observer_thought_id, - task_v2_id=observer_thought.observer_cruise_id, - organization_id=observer_thought.organization_id, + thought_id=thought.observer_thought_id, + task_v2_id=thought.observer_cruise_id, + organization_id=thought.organization_id, data=data, path=path, ) async def create_task_v2_artifact( self, - task_v2: ObserverTask, + task_v2: TaskV2, artifact_type: ArtifactType, data: bytes | None = None, path: str | None = None, @@ -202,8 +202,8 @@ class ArtifactManager: artifact_type: ArtifactType, screenshots: list[bytes] | None = None, step: Step | None = None, - observer_thought: ObserverThought | None = None, - task_v2: ObserverTask | None = None, + thought: Thought | None = None, + task_v2: TaskV2 | None = None, ai_suggestion: AISuggestion | None = None, ) -> None: if step: @@ -230,15 +230,15 @@ class ArtifactManager: artifact_type=ArtifactType.SCREENSHOT_LLM, data=screenshot, ) - elif observer_thought: - await self.create_observer_thought_artifact( - observer_thought=observer_thought, + elif thought: + await self.create_thought_artifact( + thought=thought, artifact_type=artifact_type, data=data, ) for screenshot in screenshots or []: - await self.create_observer_thought_artifact( - observer_thought=observer_thought, + await self.create_thought_artifact( + thought=thought, artifact_type=ArtifactType.SCREENSHOT_LLM, data=screenshot, ) diff --git a/skyvern/forge/sdk/artifact/models.py b/skyvern/forge/sdk/artifact/models.py index 5bdd6f87..2aa31677 100644 --- a/skyvern/forge/sdk/artifact/models.py +++ b/skyvern/forge/sdk/artifact/models.py @@ -88,4 +88,4 @@ class LogEntityType(StrEnum): TASK = "task" WORKFLOW_RUN = "workflow_run" WORKFLOW_RUN_BLOCK = "workflow_run_block" - OBSERVER = "observer" + TASK_V2 = "task_v2" diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index 3709aee3..2a2554fb 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -4,7 +4,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.files import FileInfo -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock # TODO: This should be a part of the ArtifactType model @@ -50,13 +50,11 @@ class BaseStorage(ABC): pass @abstractmethod - def build_observer_thought_uri( - self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType - ) -> str: + def build_thought_uri(self, artifact_id: str, thought: Thought, artifact_type: ArtifactType) -> str: pass @abstractmethod - def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: TaskV2, artifact_type: ArtifactType) -> str: pass @abstractmethod diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index 434f2041..4069f262 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -17,7 +17,7 @@ from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseSto from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.files import FileInfo -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger() @@ -46,13 +46,11 @@ class LocalStorage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"file://{self.artifact_path}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" - def build_observer_thought_uri( - self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType - ) -> str: + def build_thought_uri(self, artifact_id: str, thought: Thought, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] - return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + return f"file://{self.artifact_path}/{settings.ENV}/tasks/{thought.observer_cruise_id}/{thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" - def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: TaskV2, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"file://{self.artifact_path}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" diff --git a/skyvern/forge/sdk/artifact/storage/s3.py b/skyvern/forge/sdk/artifact/storage/s3.py index a395c84e..a3d0b434 100644 --- a/skyvern/forge/sdk/artifact/storage/s3.py +++ b/skyvern/forge/sdk/artifact/storage/s3.py @@ -20,7 +20,7 @@ from skyvern.forge.sdk.artifact.storage.base import FILE_EXTENTSION_MAP, BaseSto from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion from skyvern.forge.sdk.schemas.files import FileInfo -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock LOG = structlog.get_logger() @@ -46,13 +46,11 @@ class S3Storage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"s3://{self.bucket}/{settings.ENV}/logs/{log_entity_type}/{log_entity_id}/{datetime.utcnow().isoformat()}_{artifact_type}.{file_ext}" - def build_observer_thought_uri( - self, artifact_id: str, observer_thought: ObserverThought, artifact_type: ArtifactType - ) -> str: + def build_thought_uri(self, artifact_id: str, thought: Thought, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] - return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + return f"s3://{self.bucket}/{settings.ENV}/observers/{thought.observer_cruise_id}/{thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" - def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: TaskV2, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"s3://{self.bucket}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 19116348..067a1c22 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -22,8 +22,6 @@ from skyvern.forge.sdk.db.models import ( BitwardenSensitiveInformationParameterModel, CredentialModel, CredentialParameterModel, - ObserverCruiseModel, - ObserverThoughtModel, OrganizationAuthTokenModel, OrganizationBitwardenCollectionModel, OrganizationModel, @@ -33,6 +31,8 @@ from skyvern.forge.sdk.db.models import ( TaskGenerationModel, TaskModel, TaskRunModel, + TaskV2Model, + ThoughtModel, TOTPCodeModel, WorkflowModel, WorkflowParameterModel, @@ -68,7 +68,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAu from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession from skyvern.forge.sdk.schemas.task_generations import TaskGeneration from skyvern.forge.sdk.schemas.task_runs import TaskRun, TaskRunType -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask, ObserverTaskStatus, ObserverThought, ObserverThoughtType +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Status, Thought, ThoughtType from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus from skyvern.forge.sdk.schemas.totp_codes import TOTPCode from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock @@ -211,7 +211,7 @@ class AgentDB: workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, task_v2_id: str | None = None, - observer_thought_id: str | None = None, + thought_id: str | None = None, ai_suggestion_id: str | None = None, organization_id: str | None = None, ) -> Artifact: @@ -226,7 +226,7 @@ class AgentDB: workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, observer_cruise_id=task_v2_id, - observer_thought_id=observer_thought_id, + observer_thought_id=thought_id, ai_suggestion_id=ai_suggestion_id, organization_id=organization_id, ) @@ -893,7 +893,7 @@ class AgentDB: step_id: str | None = None, workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, - observer_thought_id: str | None = None, + thought_id: str | None = None, task_v2_id: str | None = None, organization_id: str | None = None, ) -> list[Artifact]: @@ -911,8 +911,8 @@ class AgentDB: query = query.filter_by(workflow_run_id=workflow_run_id) if workflow_run_block_id is not None: query = query.filter_by(workflow_run_block_id=workflow_run_block_id) - if observer_thought_id is not None: - query = query.filter_by(observer_thought_id=observer_thought_id) + if thought_id is not None: + query = query.filter_by(observer_thought_id=thought_id) if task_v2_id is not None: query = query.filter_by(observer_cruise_id=task_v2_id) if organization_id is not None: @@ -937,7 +937,7 @@ class AgentDB: step_id: str | None = None, workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, - observer_thought_id: str | None = None, + thought_id: str | None = None, task_v2_id: str | None = None, organization_id: str | None = None, ) -> Artifact | None: @@ -947,7 +947,7 @@ class AgentDB: step_id=step_id, workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, - observer_thought_id=observer_thought_id, + thought_id=thought_id, task_v2_id=task_v2_id, organization_id=organization_id, ) @@ -2128,24 +2128,24 @@ class AgentDB: await session.execute(stmt) await session.commit() - async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None: + async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> TaskV2 | None: async with self.Session() as session: if task_v2 := ( await session.scalars( - select(ObserverCruiseModel) + select(TaskV2Model) .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) ) ).first(): - return ObserverTask.model_validate(task_v2) + return TaskV2.model_validate(task_v2) return None - async def delete_observer_thoughts_for_cruise(self, task_v2_id: str, organization_id: str | None = None) -> None: + async def delete_thoughts(self, task_v2_id: str, organization_id: str | None = None) -> None: async with self.Session() as session: - stmt = delete(ObserverThoughtModel).where( + stmt = delete(ThoughtModel).where( and_( - ObserverThoughtModel.observer_cruise_id == task_v2_id, - ObserverThoughtModel.organization_id == organization_id, + ThoughtModel.observer_cruise_id == task_v2_id, + ThoughtModel.organization_id == organization_id, ) ) await session.execute(stmt) @@ -2155,49 +2155,47 @@ class AgentDB: self, workflow_run_id: str, organization_id: str | None = None, - ) -> ObserverTask | None: + ) -> TaskV2 | None: async with self.Session() as session: if task_v2 := ( await session.scalars( - select(ObserverCruiseModel) + select(TaskV2Model) .filter_by(organization_id=organization_id) .filter_by(workflow_run_id=workflow_run_id) ) ).first(): - return ObserverTask.model_validate(task_v2) + return TaskV2.model_validate(task_v2) return None - async def get_observer_thought( - self, observer_thought_id: str, organization_id: str | None = None - ) -> ObserverThought | None: + async def get_thought(self, thought_id: str, organization_id: str | None = None) -> Thought | None: async with self.Session() as session: - if observer_thought := ( + if thought := ( await session.scalars( - select(ObserverThoughtModel) - .filter_by(observer_thought_id=observer_thought_id) + select(ThoughtModel) + .filter_by(observer_thought_id=thought_id) .filter_by(organization_id=organization_id) ) ).first(): - return ObserverThought.model_validate(observer_thought) + return Thought.model_validate(thought) return None - async def get_observer_thoughts( + async def get_thoughts( self, task_v2_id: str, - observer_thought_types: list[ObserverThoughtType] | None = None, + thought_types: list[ThoughtType] | None = None, organization_id: str | None = None, - ) -> list[ObserverThought]: + ) -> list[Thought]: async with self.Session() as session: query = ( - select(ObserverThoughtModel) + select(ThoughtModel) .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) - .order_by(ObserverThoughtModel.created_at) + .order_by(ThoughtModel.created_at) ) - if observer_thought_types: - query = query.filter(ObserverThoughtModel.observer_thought_type.in_(observer_thought_types)) - observer_thoughts = (await session.scalars(query)).all() - return [ObserverThought.model_validate(thought) for thought in observer_thoughts] + if thought_types: + query = query.filter(ThoughtModel.observer_thought_type.in_(thought_types)) + thoughts = (await session.scalars(query)).all() + return [Thought.model_validate(thought) for thought in thoughts] async def create_task_v2( self, @@ -2211,9 +2209,9 @@ class AgentDB: totp_identifier: str | None = None, totp_verification_url: str | None = None, webhook_callback_url: str | None = None, - ) -> ObserverTask: + ) -> TaskV2: async with self.Session() as session: - new_task_v2 = ObserverCruiseModel( + new_task_v2 = TaskV2Model( workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, @@ -2228,9 +2226,9 @@ class AgentDB: session.add(new_task_v2) await session.commit() await session.refresh(new_task_v2) - return ObserverTask.model_validate(new_task_v2) + return TaskV2.model_validate(new_task_v2) - async def create_observer_thought( + async def create_thought( self, task_v2_id: str, workflow_run_id: str | None = None, @@ -2241,16 +2239,16 @@ class AgentDB: observation: str | None = None, thought: str | None = None, answer: str | None = None, - observer_thought_scenario: str | None = None, - observer_thought_type: str = ObserverThoughtType.plan, + thought_scenario: str | None = None, + thought_type: str = ThoughtType.plan, output: dict[str, Any] | None = None, input_token_count: int | None = None, output_token_count: int | None = None, thought_cost: float | None = None, organization_id: str | None = None, - ) -> ObserverThought: + ) -> Thought: async with self.Session() as session: - new_observer_thought = ObserverThoughtModel( + new_thought = ThoughtModel( observer_cruise_id=task_v2_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, @@ -2260,22 +2258,22 @@ class AgentDB: observation=observation, thought=thought, answer=answer, - observer_thought_scenario=observer_thought_scenario, - observer_thought_type=observer_thought_type, + observer_thought_scenario=thought_scenario, + observer_thought_type=thought_type, output=output, input_token_count=input_token_count, output_token_count=output_token_count, thought_cost=thought_cost, organization_id=organization_id, ) - session.add(new_observer_thought) + session.add(new_thought) await session.commit() - await session.refresh(new_observer_thought) - return ObserverThought.model_validate(new_observer_thought) + await session.refresh(new_thought) + return Thought.model_validate(new_thought) - async def update_observer_thought( + async def update_thought( self, - observer_thought_id: str, + thought_id: str, workflow_run_block_id: str | None = None, workflow_run_id: str | None = None, workflow_id: str | None = None, @@ -2288,47 +2286,47 @@ class AgentDB: output_token_count: int | None = None, thought_cost: float | None = None, organization_id: str | None = None, - ) -> ObserverThought: + ) -> Thought: async with self.Session() as session: - observer_thought = ( + thought_obj = ( await session.scalars( - select(ObserverThoughtModel) - .filter_by(observer_thought_id=observer_thought_id) + select(ThoughtModel) + .filter_by(observer_thought_id=thought_id) .filter_by(organization_id=organization_id) ) ).first() - if observer_thought: + if thought_obj: if workflow_run_block_id: - observer_thought.workflow_run_block_id = workflow_run_block_id + thought_obj.workflow_run_block_id = workflow_run_block_id if workflow_run_id: - observer_thought.workflow_run_id = workflow_run_id + thought_obj.workflow_run_id = workflow_run_id if workflow_id: - observer_thought.workflow_id = workflow_id + thought_obj.workflow_id = workflow_id if workflow_permanent_id: - observer_thought.workflow_permanent_id = workflow_permanent_id + thought_obj.workflow_permanent_id = workflow_permanent_id if observation: - observer_thought.observation = observation + thought_obj.observation = observation if thought: - observer_thought.thought = thought + thought_obj.thought = thought if answer: - observer_thought.answer = answer + thought_obj.answer = answer if output: - observer_thought.output = output + thought_obj.output = output if input_token_count: - observer_thought.input_token_count = input_token_count + thought_obj.input_token_count = input_token_count if output_token_count: - observer_thought.output_token_count = output_token_count + thought_obj.output_token_count = output_token_count if thought_cost: - observer_thought.thought_cost = thought_cost + thought_obj.thought_cost = thought_cost await session.commit() - await session.refresh(observer_thought) - return ObserverThought.model_validate(observer_thought) - raise NotFoundError(f"ObserverThought {observer_thought_id}") + await session.refresh(thought_obj) + return Thought.model_validate(thought_obj) + raise NotFoundError(f"Thought {thought_id}") async def update_task_v2( self, task_v2_id: str, - status: ObserverTaskStatus | None = None, + status: TaskV2Status | None = None, workflow_run_id: str | None = None, workflow_id: str | None = None, workflow_permanent_id: str | None = None, @@ -2337,11 +2335,11 @@ class AgentDB: summary: str | None = None, output: dict[str, Any] | None = None, organization_id: str | None = None, - ) -> ObserverTask: + ) -> TaskV2: async with self.Session() as session: task_v2 = ( await session.scalars( - select(ObserverCruiseModel) + select(TaskV2Model) .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) ) @@ -2365,7 +2363,7 @@ class AgentDB: task_v2.output = output await session.commit() await session.refresh(task_v2) - return ObserverTask.model_validate(task_v2) + return TaskV2.model_validate(task_v2) raise NotFoundError(f"TaskV2 {task_v2_id} not found") async def create_workflow_run_block( diff --git a/skyvern/forge/sdk/db/id.py b/skyvern/forge/sdk/db/id.py index 42409a8d..f77238ce 100644 --- a/skyvern/forge/sdk/db/id.py +++ b/skyvern/forge/sdk/db/id.py @@ -38,7 +38,7 @@ CREDENTIAL_PARAMETER_PREFIX = "cp" CREDENTIAL_PREFIX = "cred" ORGANIZATION_BITWARDEN_COLLECTION_PREFIX = "obc" TASK_V2_ID = "oc" -OBSERVER_THOUGHT_ID = "ot" +THOUGHT_ID = "ot" ORGANIZATION_AUTH_TOKEN_PREFIX = "oat" ORG_PREFIX = "o" OUTPUT_PARAMETER_PREFIX = "op" @@ -161,9 +161,9 @@ def generate_task_v2_id() -> str: return f"{TASK_V2_ID}_{int_id}" -def generate_observer_thought_id() -> str: +def generate_thought_id() -> str: int_id = generate_id() - return f"{OBSERVER_THOUGHT_ID}_{int_id}" + return f"{THOUGHT_ID}_{int_id}" def generate_persistent_browser_session_id() -> str: diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index d980b253..1db65fa8 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -29,7 +29,6 @@ from skyvern.forge.sdk.db.id import ( generate_bitwarden_sensitive_information_parameter_id, generate_credential_id, generate_credential_parameter_id, - generate_observer_thought_id, generate_org_id, generate_organization_auth_token_id, generate_organization_bitwarden_collection_id, @@ -40,6 +39,7 @@ from skyvern.forge.sdk.db.id import ( generate_task_id, generate_task_run_id, generate_task_v2_id, + generate_thought_id, generate_totp_code_id, generate_workflow_id, generate_workflow_parameter_id, @@ -47,7 +47,7 @@ from skyvern.forge.sdk.db.id import ( generate_workflow_run_block_id, generate_workflow_run_id, ) -from skyvern.forge.sdk.schemas.task_v2 import ObserverThoughtType +from skyvern.forge.sdk.schemas.task_v2 import ThoughtType class Base(AsyncAttrs, DeclarativeBase): @@ -565,7 +565,7 @@ class WorkflowRunBlockModel(Base): modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) -class ObserverCruiseModel(Base): +class TaskV2Model(Base): __tablename__ = "observer_cruises" __table_args__ = (Index("oc_org_wfr_index", "organization_id", "workflow_run_id"),) @@ -589,11 +589,11 @@ class ObserverCruiseModel(Base): modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) -class ObserverThoughtModel(Base): +class ThoughtModel(Base): __tablename__ = "observer_thoughts" __table_args__ = (Index("observer_cruise_index", "organization_id", "observer_cruise_id"),) - observer_thought_id = Column(String, primary_key=True, default=generate_observer_thought_id) + observer_thought_id = Column(String, primary_key=True, default=generate_thought_id) organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True) observer_cruise_id = Column(String, ForeignKey("observer_cruises.observer_cruise_id"), nullable=False) workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=True) @@ -608,7 +608,7 @@ class ObserverThoughtModel(Base): output_token_count = Column(Integer, nullable=True) thought_cost = Column(Numeric, nullable=True) - observer_thought_type = Column(String, nullable=True, default=ObserverThoughtType.plan) + observer_thought_type = Column(String, nullable=True, default=ThoughtType.plan) observer_thought_scenario = Column(String, nullable=True) output = Column(JSON, nullable=True) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index bee051ff..0308f791 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -7,7 +7,7 @@ from skyvern.exceptions import OrganizationNotFound from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext -from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskStatus +from skyvern.forge.sdk.schemas.task_v2 import TaskV2Status from skyvern.forge.sdk.schemas.tasks import TaskStatus from skyvern.forge.sdk.services import task_v2_service from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus @@ -159,12 +159,12 @@ class BackgroundTaskExecutor(AsyncExecutor): task_v2 = await app.DATABASE.get_task_v2(task_v2_id=task_v2_id, organization_id=organization_id) if not task_v2 or not task_v2.workflow_run_id: - raise ValueError("No observer cruise or no workflow run associated with observer cruise") + raise ValueError("No task v2 or no workflow run associated with task v2") - # mark observer cruise as queued + # mark task v2 as queued await app.DATABASE.update_task_v2( task_v2_id=task_v2_id, - status=ObserverTaskStatus.queued, + status=TaskV2Status.queued, organization_id=organization_id, ) await app.DATABASE.update_workflow_run( diff --git a/skyvern/forge/sdk/log_artifacts.py b/skyvern/forge/sdk/log_artifacts.py index 928a23f6..b800115f 100644 --- a/skyvern/forge/sdk/log_artifacts.py +++ b/skyvern/forge/sdk/log_artifacts.py @@ -21,6 +21,8 @@ def primary_key_from_log_entity_type(log_entity_type: LogEntityType) -> str: return "workflow_run_id" elif log_entity_type == LogEntityType.WORKFLOW_RUN_BLOCK: return "workflow_run_block_id" + elif log_entity_type == LogEntityType.TASK_V2: + return "task_v2_id" else: raise ValueError(f"Invalid log entity type: {log_entity_type}") diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 41982fad..e6cc7d88 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -47,7 +47,7 @@ from skyvern.forge.sdk.schemas.organizations import ( ) from skyvern.forge.sdk.schemas.task_generations import GenerateTaskRequest, TaskGeneration, TaskGenerationBase from skyvern.forge.sdk.schemas.task_runs import TaskRunType -from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskRequest +from skyvern.forge.sdk.schemas.task_v2 import TaskV2Request from skyvern.forge.sdk.schemas.tasks import ( CreateTaskResponse, OrderBy, @@ -529,7 +529,7 @@ class EntityType(str, Enum): TASK = "task" WORKFLOW_RUN = "workflow_run" WORKFLOW_RUN_BLOCK = "workflow_run_block" - OBSERVER_THOUGHT = "observer_thought" + THOUGHT = "thought" entity_type_to_param = { @@ -537,7 +537,7 @@ entity_type_to_param = { EntityType.TASK: "task_id", EntityType.WORKFLOW_RUN: "workflow_run_id", EntityType.WORKFLOW_RUN_BLOCK: "workflow_run_block_id", - EntityType.OBSERVER_THOUGHT: "observer_thought_id", + EntityType.THOUGHT: "thought_id", } @@ -1226,12 +1226,12 @@ async def upload_file( async def create_task_v2( request: Request, background_tasks: BackgroundTasks, - data: ObserverTaskRequest, + data: TaskV2Request, organization: Organization = Depends(org_auth_service.get_current_org), x_max_iterations_override: Annotated[int | str | None, Header()] = None, ) -> dict[str, Any]: if x_max_iterations_override: - LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) + LOG.info("Overriding max iterations for task v2", max_iterations_override=x_max_iterations_override) try: task_v2 = await task_v2_service.initialize_task_v2( @@ -1246,11 +1246,11 @@ async def create_task_v2( create_task_run=True, ) except LLMProviderError: - LOG.error("LLM failure to initialize observer cruise", exc_info=True) + LOG.error("LLM failure to initialize task v2", exc_info=True) raise HTTPException( - status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later." + status_code=500, detail="Skyvern LLM failure to initialize task v2. Please try again later." ) - analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": task_v2.url}) + analytics.capture("skyvern-oss-agent-task-v2", data={"url": task_v2.url}) await AsyncExecutorFactory.get_executor().execute_task_v2( request=request, background_tasks=background_tasks, @@ -1373,7 +1373,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: Get the timeline workflow runs including the nested workflow runs in a flattened list """ - # get observer task by workflow run id + # get task v2 by workflow run id task_v2_obj = await app.DATABASE.get_task_v2_by_workflow_run_id( workflow_run_id=workflow_run_id, organization_id=organization_id, @@ -1408,10 +1408,10 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: final_workflow_run_block_timeline.extend(workflow_blocks) if task_v2_obj and task_v2_obj.observer_cruise_id: - observer_thought_timeline = await task_v2_service.get_observer_thought_timelines( + thought_timeline = await task_v2_service.get_thought_timelines( task_v2_id=task_v2_obj.observer_cruise_id, organization_id=organization_id, ) - final_workflow_run_block_timeline.extend(observer_thought_timeline) + final_workflow_run_block_timeline.extend(thought_timeline) final_workflow_run_block_timeline.sort(key=lambda x: x.created_at, reverse=True) return final_workflow_run_block_timeline diff --git a/skyvern/forge/sdk/schemas/task_v2.py b/skyvern/forge/sdk/schemas/task_v2.py index 623478cd..3839062f 100644 --- a/skyvern/forge/sdk/schemas/task_v2.py +++ b/skyvern/forge/sdk/schemas/task_v2.py @@ -10,7 +10,7 @@ from skyvern.forge.sdk.schemas.tasks import ProxyLocation DEFAULT_WORKFLOW_TITLE = "New Workflow" -class ObserverTaskStatus(StrEnum): +class TaskV2Status(StrEnum): created = "created" queued = "queued" running = "running" @@ -24,11 +24,11 @@ class ObserverTaskStatus(StrEnum): return self in [self.failed, self.terminated, self.canceled, self.timed_out, self.completed] -class ObserverTask(BaseModel): +class TaskV2(BaseModel): model_config = ConfigDict(from_attributes=True, populate_by_name=True) observer_cruise_id: str = Field(alias="task_id") - status: ObserverTaskStatus + status: TaskV2Status organization_id: str | None = None workflow_run_id: str | None = None workflow_id: str | None = None @@ -54,14 +54,14 @@ class ObserverTask(BaseModel): return validate_url(url) -class ObserverThoughtType(StrEnum): +class ThoughtType(StrEnum): plan = "plan" metadata = "metadata" user_goal_check = "user_goal_check" internal_plan = "internal_plan" -class ObserverThoughtScenario(StrEnum): +class ThoughtScenario(StrEnum): generate_plan = "generate_plan" user_goal_check = "user_goal_check" summarization = "summarization" @@ -71,7 +71,7 @@ class ObserverThoughtScenario(StrEnum): generate_task = "generate_general_task" -class ObserverThought(BaseModel): +class Thought(BaseModel): model_config = ConfigDict(from_attributes=True, populate_by_name=True) observer_thought_id: str = Field(alias="thought_id") @@ -85,8 +85,8 @@ class ObserverThought(BaseModel): observation: str | None = None thought: str | None = None answer: str | None = None - observer_thought_type: ObserverThoughtType | None = Field(alias="thought_type", default=ObserverThoughtType.plan) - observer_thought_scenario: ObserverThoughtScenario | None = Field(alias="thought_scenario", default=None) + observer_thought_type: ThoughtType | None = Field(alias="thought_type", default=ThoughtType.plan) + observer_thought_scenario: ThoughtScenario | None = Field(alias="thought_scenario", default=None) output: dict[str, Any] | None = None input_token_count: int | None = None output_token_count: int | None = None @@ -96,7 +96,7 @@ class ObserverThought(BaseModel): modified_at: datetime -class ObserverMetadata(BaseModel): +class TaskV2Metadata(BaseModel): url: str workflow_title: str = DEFAULT_WORKFLOW_TITLE @@ -108,7 +108,7 @@ class ObserverMetadata(BaseModel): return validate_url(v) -class ObserverTaskRequest(BaseModel): +class TaskV2Request(BaseModel): user_prompt: str url: str | None = None browser_session_id: str | None = None diff --git a/skyvern/forge/sdk/schemas/workflow_runs.py b/skyvern/forge/sdk/schemas/workflow_runs.py index 1122b68a..cd8fed84 100644 --- a/skyvern/forge/sdk/schemas/workflow_runs.py +++ b/skyvern/forge/sdk/schemas/workflow_runs.py @@ -6,7 +6,7 @@ from typing import Any from pydantic import BaseModel -from skyvern.forge.sdk.schemas.task_v2 import ObserverThought +from skyvern.forge.sdk.schemas.task_v2 import Thought from skyvern.forge.sdk.workflow.models.block import BlockType from skyvern.webeye.actions.actions import Action @@ -58,7 +58,7 @@ class WorkflowRunTimelineType(StrEnum): class WorkflowRunTimeline(BaseModel): type: WorkflowRunTimelineType block: WorkflowRunBlock | None = None - thought: ObserverThought | None = None + thought: Thought | None = None children: list[WorkflowRunTimeline] = [] created_at: datetime modified_at: datetime diff --git a/skyvern/forge/sdk/services/task_v2_service.py b/skyvern/forge/sdk/services/task_v2_service.py index 656b559c..da606bff 100644 --- a/skyvern/forge/sdk/services/task_v2_service.py +++ b/skyvern/forge/sdk/services/task_v2_service.py @@ -19,13 +19,7 @@ from skyvern.forge.sdk.core.skyvern_context import SkyvernContext from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.task_runs import TaskRunType -from skyvern.forge.sdk.schemas.task_v2 import ( - ObserverMetadata, - ObserverTask, - ObserverTaskStatus, - ObserverThoughtScenario, - ObserverThoughtType, -) +from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Metadata, TaskV2Status, ThoughtScenario, ThoughtType from skyvern.forge.sdk.schemas.tasks import ProxyLocation from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType from skyvern.forge.sdk.workflow.models.block import ( @@ -100,7 +94,7 @@ async def initialize_task_v2( publish_workflow: bool = False, parent_workflow_run_id: str | None = None, create_task_run: bool = False, -) -> ObserverTask: +) -> TaskV2: task_v2 = await app.DATABASE.create_task_v2( prompt=user_prompt, organization_id=organization.organization_id, @@ -109,31 +103,31 @@ async def initialize_task_v2( webhook_callback_url=webhook_callback_url, proxy_location=proxy_location, ) - # set observer cruise id in context + # set task_v2_id in context context = skyvern_context.current() if context: context.task_v2_id = task_v2.observer_cruise_id - observer_thought = await app.DATABASE.create_observer_thought( + thought = await app.DATABASE.create_thought( task_v2_id=task_v2.observer_cruise_id, organization_id=organization.organization_id, - observer_thought_type=ObserverThoughtType.metadata, - observer_thought_scenario=ObserverThoughtScenario.generate_metadata, + thought_type=ThoughtType.metadata, + thought_scenario=ThoughtScenario.generate_metadata, ) metadata_prompt = prompt_engine.load_prompt("task_v2_generate_metadata", user_goal=user_prompt, user_url=user_url) metadata_response = await app.LLM_API_HANDLER( prompt=metadata_prompt, - observer_thought=observer_thought, + thought=thought, prompt_name="task_v2_generate_metadata", ) # validate - LOG.info(f"Initialized observer initial response: {metadata_response}") + LOG.info(f"Initialized task v2 initial response: {metadata_response}") url: str = user_url or metadata_response.get("url", "") if not url: raise UrlGenerationFailure() title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE) - metadata = ObserverMetadata( + metadata = TaskV2Metadata( url=url, workflow_title=title, ) @@ -172,8 +166,8 @@ async def initialize_task_v2( raise try: - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, organization_id=organization.organization_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=new_workflow.workflow_id, @@ -182,7 +176,7 @@ async def initialize_task_v2( output=metadata.model_dump(), ) except Exception: - LOG.warning("Failed to update observer thought", exc_info=True) + LOG.warning("Failed to update thought", exc_info=True) # update oserver cruise try: @@ -223,13 +217,13 @@ async def run_task_v2( request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, -) -> ObserverTask: +) -> TaskV2: organization_id = organization.organization_id try: task_v2 = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) except Exception: LOG.error( - "Failed to get observer task", + "Failed to get task v2", task_v2_id=task_v2_id, organization_id=organization_id, exc_info=True, @@ -262,7 +256,7 @@ async def run_task_v2( LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message) return task_v2 except OperationalError: - LOG.error("Database error when running observer cruise", exc_info=True) + LOG.error("Database error when running task v2", exc_info=True) task_v2 = await mark_task_v2_as_failed( task_v2_id, workflow_run_id=task_v2.workflow_run_id, @@ -270,7 +264,7 @@ async def run_task_v2( organization_id=organization_id, ) except Exception as e: - LOG.error("Failed to run observer cruise", exc_info=True) + LOG.error("Failed to run task v2", exc_info=True) failure_reason = f"Failed to run task 2.0: {str(e)}" task_v2 = await mark_task_v2_as_failed( task_v2_id, @@ -296,14 +290,14 @@ async def run_task_v2( async def run_task_v2_helper( organization: Organization, - task_v2: ObserverTask, + task_v2: TaskV2, request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, -) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: +) -> tuple[Workflow, WorkflowRun, TaskV2] | tuple[None, None, TaskV2]: organization_id = organization.organization_id task_v2_id = task_v2.observer_cruise_id - if task_v2.status != ObserverTaskStatus.queued: + if task_v2.status != TaskV2Status.queued: LOG.error( "Task v2 is not queued. Duplicate task v2", task_v2_id=task_v2_id, @@ -356,7 +350,7 @@ async def run_task_v2_helper( LOG.error("Workflow not found", workflow_id=workflow_id) return None, None, task_v2 - ###################### run observer ###################### + ###################### run task v2 ###################### skyvern_context.set( SkyvernContext( @@ -369,7 +363,7 @@ async def run_task_v2_helper( ) task_v2 = await app.DATABASE.update_task_v2( - task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running + task_v2_id=task_v2_id, organization_id=organization_id, status=TaskV2Status.running ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await _set_up_workflow_context(workflow_id, workflow_run_id, organization) @@ -408,7 +402,7 @@ async def run_task_v2_helper( ) return workflow, workflow_run, task_v2 - LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) + LOG.info(f"Task v2 iteration i={i}", workflow_run_id=workflow_run_id, url=url) task_type = "" plan = "" block: BlockTypeVar | None = None @@ -441,14 +435,14 @@ async def run_task_v2_helper( page = await browser_state.get_working_page() except Exception: LOG.exception( - "Failed to get browser state or scrape website in observer iteration", iteration=i, url=url + "Failed to get browser state or scrape website in task v2 iteration", iteration=i, url=url ) continue current_url = str( await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url ) - observer_prompt = prompt_engine.load_prompt( + task_v2_prompt = prompt_engine.load_prompt( "task_v2", current_url=current_url, elements=element_tree_in_prompt, @@ -456,37 +450,37 @@ async def run_task_v2_helper( task_history=task_history, local_datetime=datetime.now(context.tz_info).isoformat(), ) - observer_thought = await app.DATABASE.create_observer_thought( + thought = await app.DATABASE.create_thought( task_v2_id=task_v2_id, organization_id=organization_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=workflow.workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.plan, - observer_thought_scenario=ObserverThoughtScenario.generate_plan, + thought_type=ThoughtType.plan, + thought_scenario=ThoughtScenario.generate_plan, ) - observer_response = await app.LLM_API_HANDLER( - prompt=observer_prompt, + task_v2_response = await app.LLM_API_HANDLER( + prompt=task_v2_prompt, screenshots=scraped_page.screenshots, - observer_thought=observer_thought, + thought=thought, prompt_name="task_v2", ) LOG.info( - "Observer response", - observer_response=observer_response, + "Task v2 response", + task_v2_response=task_v2_response, iteration=i, current_url=current_url, workflow_run_id=workflow_run_id, ) # see if the user goal has achieved or not - user_goal_achieved = observer_response.get("user_goal_achieved", False) - observation = observer_response.get("page_info", "") - thoughts: str = observer_response.get("thoughts", "") - plan = observer_response.get("plan", "") - task_type = observer_response.get("task_type", "") - # Create and save observer thought - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, + user_goal_achieved = task_v2_response.get("user_goal_achieved", False) + observation = task_v2_response.get("page_info", "") + thoughts: str = task_v2_response.get("thoughts", "") + plan = task_v2_response.get("plan", "") + task_type = task_v2_response.get("task_type", "") + # Create and save task thought + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, organization_id=organization_id, thought=thoughts, observation=observation, @@ -496,7 +490,7 @@ async def run_task_v2_helper( if user_goal_achieved is True: LOG.info( - "User goal achieved. Workflow run will complete. Observer is stopping", + "User goal achieved. Workflow run will complete. Task v2 is stopping", iteration=i, workflow_run_id=workflow_run_id, ) @@ -509,12 +503,12 @@ async def run_task_v2_helper( break if not plan: - LOG.warning("No plan found in observer response", observer_response=observer_response) + LOG.warning("No plan found in task v2 response", task_v2_response=task_v2_response) continue - # parse observer repsonse and run the next task + # parse task v2 response and run the next task if not task_type: - LOG.error("No task type found in observer response", observer_response=observer_response) + LOG.error("No task type found in task v2 response", task_v2_response=task_v2_response) await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, @@ -635,7 +629,7 @@ async def run_task_v2_helper( ) if workflow_run.status != WorkflowRunStatus.running: LOG.info( - "Workflow run is not running anymore, stopping the observer", + "Workflow run is not running anymore, stopping the task v2", workflow_run_id=workflow_run_id, status=workflow_run.status, ) @@ -656,48 +650,48 @@ async def run_task_v2_helper( ) completion_screenshots = scraped_page.screenshots except Exception: - LOG.warning("Failed to scrape the website for observer completion check") + LOG.warning("Failed to scrape the website for task v2 completion check") # validate completion only happens at the last iteration - observer_completion_prompt = prompt_engine.load_prompt( + task_v2_completion_prompt = prompt_engine.load_prompt( "task_v2_check_completion", user_goal=user_prompt, task_history=task_history, local_datetime=datetime.now(context.tz_info).isoformat(), ) - observer_thought = await app.DATABASE.create_observer_thought( + thought = await app.DATABASE.create_thought( task_v2_id=task_v2_id, organization_id=organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.user_goal_check, - observer_thought_scenario=ObserverThoughtScenario.user_goal_check, + thought_type=ThoughtType.user_goal_check, + thought_scenario=ThoughtScenario.user_goal_check, ) completion_resp = await app.LLM_API_HANDLER( - prompt=observer_completion_prompt, + prompt=task_v2_completion_prompt, screenshots=completion_screenshots, - observer_thought=observer_thought, - prompt_name="observer_check_completion", + thought=thought, + prompt_name="task_v2_check_completion", ) LOG.info( - "Observer completion check response", + "Task v2 completion check response", completion_resp=completion_resp, iteration=i, workflow_run_id=workflow_run_id, task_history=task_history, ) user_goal_achieved = completion_resp.get("user_goal_achieved", False) - thought = completion_resp.get("thoughts", "") - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, + thought_content = completion_resp.get("thoughts", "") + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, organization_id=organization_id, - thought=thought, + thought=thought_content, output={"user_goal_achieved": user_goal_achieved}, ) if user_goal_achieved: LOG.info( - "User goal achieved according to the observer completion check", + "User goal achieved according to the task v2 completion check", iteration=i, workflow_run_id=workflow_run_id, completion_resp=completion_resp, @@ -711,7 +705,7 @@ async def run_task_v2_helper( break else: LOG.info( - "Observer cruise failed - run out of iterations", + "Task v2 failed - run out of iterations", max_iterations=max_iterations, workflow_run_id=workflow_run_id, ) @@ -770,7 +764,7 @@ async def handle_block_result( block_type_var=block.block_type, block_label=block.label, ) - # observer will continue running the workflow + # task v2 will continue running the workflow elif block_result.status == BlockStatus.terminated: LOG.info( f"Block with type {block.block_type} was terminated for workflow run {workflow_run_id}", @@ -815,7 +809,7 @@ async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str, organ async def _generate_loop_task( - task_v2: ObserverTask, + task_v2: TaskV2, workflow_id: str, workflow_permanent_id: str, workflow_run_id: str, @@ -830,21 +824,21 @@ async def _generate_loop_task( plan=plan, ) data_extraction_thought = f"Going to generate a list of values to go through based on the plan: {plan}." - observer_thought = await app.DATABASE.create_observer_thought( + thought = await app.DATABASE.create_thought( task_v2_id=task_v2.observer_cruise_id, organization_id=task_v2.organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, - observer_thought_type=ObserverThoughtType.plan, - observer_thought_scenario=ObserverThoughtScenario.extract_loop_values, + thought_type=ThoughtType.plan, + thought_scenario=ThoughtScenario.extract_loop_values, thought=data_extraction_thought, ) - # generate screenshot artifact for the observer thought + # generate screenshot artifact for the thought if scraped_page.screenshots: for screenshot in scraped_page.screenshots: - await app.ARTIFACT_MANAGER.create_observer_thought_artifact( - observer_thought=observer_thought, + await app.ARTIFACT_MANAGER.create_thought_artifact( + thought=thought, artifact_type=ArtifactType.SCREENSHOT_LLM, data=screenshot, ) @@ -878,7 +872,7 @@ async def _generate_loop_task( "Failed to execute the extraction block for the loop task", extraction_block_result=extraction_block_result, ) - # wofklow run and observer task status update is handled in the upper caller layer + # wofklow run and task v2 status update is handled in the upper caller layer raise Exception("extraction_block failed") # validate output parameter try: @@ -898,9 +892,9 @@ async def _generate_loop_task( ) raise - # update the observer thought - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, + # update the thought + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, organization_id=task_v2.organization_id, output=output_value_obj, ) @@ -959,30 +953,30 @@ async def _generate_loop_task( is_link=is_loop_value_link, loop_values=loop_values, ) - observer_thought_task_in_loop = await app.DATABASE.create_observer_thought( + thought_task_in_loop = await app.DATABASE.create_thought( task_v2_id=task_v2.observer_cruise_id, organization_id=task_v2.organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, - observer_thought_type=ObserverThoughtType.internal_plan, - observer_thought_scenario=ObserverThoughtScenario.generate_task_in_loop, + thought_type=ThoughtType.internal_plan, + thought_scenario=ThoughtScenario.generate_task_in_loop, ) task_in_loop_metadata_response = await app.LLM_API_HANDLER( task_in_loop_metadata_prompt, screenshots=scraped_page.screenshots, - observer_thought=observer_thought_task_in_loop, + thought=thought_task_in_loop, prompt_name="task_v2_generate_task_block", ) LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response) navigation_goal = task_in_loop_metadata_response.get("navigation_goal") data_extraction_goal = task_in_loop_metadata_response.get("data_extraction_goal") data_extraction_schema = task_in_loop_metadata_response.get("data_schema") - thought = task_in_loop_metadata_response.get("thoughts") - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought_task_in_loop.observer_thought_id, + thought_content = task_in_loop_metadata_response.get("thoughts") + await app.DATABASE.update_thought( + thought_id=thought_task_in_loop.observer_thought_id, organization_id=task_v2.organization_id, - thought=thought, + thought=thought_content, output=task_in_loop_metadata_response, ) if data_extraction_goal and navigation_goal: @@ -1046,7 +1040,7 @@ async def _generate_loop_task( async def _generate_extraction_task( - task_v2: ObserverTask, + task_v2: TaskV2, workflow_id: str, workflow_permanent_id: str, workflow_run_id: str, @@ -1173,16 +1167,16 @@ def _generate_random_string(length: int = 5) -> str: return "".join(random.choices(RANDOM_STRING_POOL, k=length)) -async def get_observer_thought_timelines( +async def get_thought_timelines( task_v2_id: str, organization_id: str | None = None, ) -> list[WorkflowRunTimeline]: - observer_thoughts = await app.DATABASE.get_observer_thoughts( + thoughts = await app.DATABASE.get_thoughts( task_v2_id, organization_id=organization_id, - observer_thought_types=[ - ObserverThoughtType.plan, - ObserverThoughtType.user_goal_check, + thought_types=[ + ThoughtType.plan, + ThoughtType.user_goal_check, ], ) return [ @@ -1192,11 +1186,11 @@ async def get_observer_thought_timelines( created_at=thought.created_at, modified_at=thought.modified_at, ) - for thought in observer_thoughts + for thought in thoughts ] -async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None: +async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> TaskV2 | None: return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) @@ -1205,11 +1199,11 @@ async def mark_task_v2_as_failed( workflow_run_id: str | None = None, failure_reason: str | None = None, organization_id: str | None = None, -) -> ObserverTask: +) -> TaskV2: task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, - status=ObserverTaskStatus.failed, + status=TaskV2Status.failed, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( @@ -1225,25 +1219,25 @@ async def mark_task_v2_as_completed( organization_id: str | None = None, summary: str | None = None, output: dict[str, Any] | None = None, -) -> ObserverTask: +) -> TaskV2: task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, - status=ObserverTaskStatus.completed, + status=TaskV2Status.completed, summary=summary, output=output, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) - # Track observer cruise duration when completed + # Track task v2 duration when completed duration_seconds = (datetime.now(UTC) - task_v2.created_at.replace(tzinfo=UTC)).total_seconds() LOG.info( - "Observer task duration metrics", + "Task v2 duration metrics", task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, duration_seconds=duration_seconds, - task_v2_status=ObserverTaskStatus.completed, + task_v2_status=TaskV2Status.completed, organization_id=organization_id, ) @@ -1255,11 +1249,11 @@ async def mark_task_v2_as_canceled( task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, -) -> ObserverTask: +) -> TaskV2: task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, - status=ObserverTaskStatus.canceled, + status=TaskV2Status.canceled, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) @@ -1272,11 +1266,11 @@ async def mark_task_v2_as_terminated( workflow_run_id: str | None = None, organization_id: str | None = None, failure_reason: str | None = None, -) -> ObserverTask: +) -> TaskV2: task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, - status=ObserverTaskStatus.terminated, + status=TaskV2Status.terminated, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason) @@ -1354,21 +1348,21 @@ def _get_extracted_data_from_block_result( async def _summarize_task_v2( - task_v2: ObserverTask, + task_v2: TaskV2, task_history: list[dict], context: SkyvernContext, screenshots: list[bytes] | None = None, -) -> ObserverTask: - observer_thought = await app.DATABASE.create_observer_thought( +) -> TaskV2: + thought = await app.DATABASE.create_thought( task_v2_id=task_v2.observer_cruise_id, organization_id=task_v2.organization_id, workflow_run_id=task_v2.workflow_run_id, workflow_id=task_v2.workflow_id, workflow_permanent_id=task_v2.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.user_goal_check, - observer_thought_scenario=ObserverThoughtScenario.summarization, + thought_type=ThoughtType.user_goal_check, + thought_scenario=ThoughtScenario.summarization, ) - # summarize the observer cruise and format the output + # summarize the task v2 and format the output task_v2_summary_prompt = prompt_engine.load_prompt( "task_v2_summary", user_goal=task_v2.prompt, @@ -1378,17 +1372,17 @@ async def _summarize_task_v2( task_v2_summary_resp = await app.LLM_API_HANDLER( prompt=task_v2_summary_prompt, screenshots=screenshots, - observer_thought=observer_thought, + thought=thought, prompt_name="task_v2_summary", ) LOG.info("Task v2 summary response", task_v2_summary_resp=task_v2_summary_resp) - thought = task_v2_summary_resp.get("description") + summary_description = task_v2_summary_resp.get("description") summarized_output = task_v2_summary_resp.get("output") - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, + await app.DATABASE.update_thought( + thought_id=thought.observer_thought_id, organization_id=task_v2.organization_id, - thought=thought, + thought=summary_description, output=task_v2_summary_resp, ) @@ -1396,12 +1390,12 @@ async def _summarize_task_v2( task_v2_id=task_v2.observer_cruise_id, workflow_run_id=task_v2.workflow_run_id, organization_id=task_v2.organization_id, - summary=thought, + summary=summary_description, output=summarized_output, ) -async def send_task_v2_webhook(task_v2: ObserverTask) -> None: +async def send_task_v2_webhook(task_v2: TaskV2) -> None: if not task_v2.webhook_callback_url: return organization_id = task_v2.organization_id @@ -1413,15 +1407,15 @@ async def send_task_v2_webhook(task_v2: ObserverTask) -> None: ) if not api_key: LOG.warning( - "No valid API key found for the organization of observer cruise", + "No valid API key found for the organization of task v2", task_v2_id=task_v2.observer_cruise_id, ) return - # build the observer cruise response + # build the task v2 response payload = task_v2.model_dump_json(by_alias=True) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( - "Sending observer cruise response to webhook callback url", + "Sending task v2 response to webhook callback url", task_v2_id=task_v2.observer_cruise_id, webhook_callback_url=task_v2.webhook_callback_url, payload=payload, @@ -1433,14 +1427,14 @@ async def send_task_v2_webhook(task_v2: ObserverTask) -> None: ) if resp.status_code == 200: LOG.info( - "Observer cruise webhook sent successfully", + "Task v2 webhook sent successfully", task_v2_id=task_v2.observer_cruise_id, resp_code=resp.status_code, resp_text=resp.text, ) else: LOG.info( - "Observer cruise webhook failed", + "Task v2 webhook failed", task_v2_id=task_v2.observer_cruise_id, resp=resp, resp_code=resp.status_code, diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e0266698..30efd8fe 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -50,7 +50,7 @@ from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core.validators import prepend_scheme_and_validate_url from skyvern.forge.sdk.db.enums import TaskType from skyvern.forge.sdk.schemas.files import FileInfo -from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskStatus +from skyvern.forge.sdk.schemas.task_v2 import TaskV2Status from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus from skyvern.forge.sdk.workflow.context_manager import BlockMetadata, WorkflowRunContext from skyvern.forge.sdk.workflow.exceptions import ( @@ -2116,7 +2116,6 @@ class UrlBlock(BaseTaskBlock): url: str -# observer block class TaskV2Block(Block): block_type: Literal[BlockType.TaskV2] = BlockType.TaskV2 prompt: str @@ -2159,7 +2158,7 @@ class TaskV2Block(Block): proxy_location=workflow_run.proxy_location, ) await app.DATABASE.update_task_v2( - task_v2.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id + task_v2.observer_cruise_id, status=TaskV2Status.queued, organization_id=organization_id ) if task_v2.workflow_run_id: await app.DATABASE.update_workflow_run( diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 53f30b79..6d61b4d9 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, field_validator from skyvern.forge.sdk.core.validators import validate_url from skyvern.forge.sdk.schemas.files import FileInfo -from skyvern.forge.sdk.schemas.task_v2 import ObserverTask +from skyvern.forge.sdk.schemas.task_v2 import TaskV2 from skyvern.forge.sdk.schemas.tasks import ProxyLocation from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateBlockLabels from skyvern.forge.sdk.workflow.models.block import BlockTypeVar @@ -149,5 +149,5 @@ class WorkflowRunStatusResponse(BaseModel): outputs: dict[str, Any] | None = None total_steps: int | None = None total_cost: float | None = None - task_v2: ObserverTask | None = None + task_v2: TaskV2 | None = None workflow_title: str | None = None