From ffbc95e1b4b1294e9cbc9c753cf18ca866401955 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 23 Feb 2025 16:03:49 -0800 Subject: [PATCH] task v2 refactor part 6 - observer_cruise_id -> task_v2_id (#1817) --- evaluation/core/__init__.py | 21 +-- .../create_webvoyager_evaluation_result.py | 4 +- ...server.py => create_webvoyager_task_v2.py} | 6 +- evaluation/script/eval_webvoyager_cruise.py | 4 +- skyvern/agent/local.py | 16 +- skyvern/client/agent/client.py | 8 +- skyvern/exceptions.py | 12 +- .../forge/sdk/api/llm/api_handler_factory.py | 28 +-- skyvern/forge/sdk/api/llm/models.py | 2 +- skyvern/forge/sdk/artifact/manager.py | 30 ++-- skyvern/forge/sdk/artifact/storage/base.py | 4 +- skyvern/forge/sdk/artifact/storage/local.py | 6 +- skyvern/forge/sdk/artifact/storage/s3.py | 6 +- skyvern/forge/sdk/core/skyvern_context.py | 4 +- skyvern/forge/sdk/db/client.py | 98 +++++----- skyvern/forge/sdk/db/id.py | 6 +- skyvern/forge/sdk/db/models.py | 5 +- skyvern/forge/sdk/executor/async_executor.py | 24 ++- skyvern/forge/sdk/forge_log.py | 4 +- skyvern/forge/sdk/routes/agent_protocol.py | 26 +-- skyvern/forge/sdk/services/task_v2_service.py | 170 +++++++++--------- skyvern/forge/sdk/workflow/models/block.py | 4 +- 22 files changed, 238 insertions(+), 250 deletions(-) rename evaluation/script/{create_webvoyager_observer.py => create_webvoyager_task_v2.py} (93%) diff --git a/evaluation/core/__init__.py b/evaluation/core/__init__.py index 03b3a627..80c4b7a4 100644 --- a/evaluation/core/__init__.py +++ b/evaluation/core/__init__.py @@ -25,6 +25,7 @@ class TaskOutput(BaseModel): class SkyvernClient: def __init__(self, base_url: str, credentials: str): self.base_url = base_url + self.v2_base_url = base_url.replace("/api/v1", "/api/v2") self.credentials = credentials def generate_curl_params(self, request_body: BaseModel, max_steps: int | None = None) -> tuple[dict, dict]: @@ -54,11 +55,11 @@ class SkyvernClient: assert "workflow_run_id" in response.json(), f"Failed to create workflow run: {response.text}" return response.json()["workflow_run_id"] - def create_cruise(self, cruise_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask: - url = f"{self.base_url}/cruise" - payload, headers = self.generate_curl_params(cruise_request, max_steps=max_steps) + def create_task_v2(self, task_v2_request: ObserverTaskRequest, max_steps: int | None = None) -> ObserverTask: + url = f"{self.v2_base_url}/tasks" + payload, headers = self.generate_curl_params(task_v2_request, max_steps=max_steps) response = requests.post(url, headers=headers, data=payload) - assert "observer_cruise_id" in response.json(), f"Failed to create observer cruise: {response.text}" + assert "task_id" in response.json(), f"Failed to create task v2: {response.text}" return ObserverTask.model_validate(response.json()) def get_task(self, task_id: str) -> TaskResponse: @@ -213,7 +214,7 @@ class Evaluator: return workflow_run_id def queue_skyvern_cruise(self, cruise_request: ObserverTaskRequest, max_step: int | None = None) -> ObserverTask: - cruise = self.client.create_cruise(cruise_request=cruise_request, max_steps=max_step) + cruise = self.client.create_task_v2(task_v2_request=cruise_request, max_steps=max_step) self._save_artifact("cruise.json", cruise.model_dump_json(indent=2).encode()) return cruise @@ -259,7 +260,7 @@ class Evaluator: ) extracted_information: list | dict[str, Any] | str | None = None - if workflow_run_response.observer_cruise is None: + if workflow_run_response.observer_task is None: assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, ( f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output" ) @@ -271,10 +272,10 @@ class Evaluator: # FIXME: improve this when the last block is loop block extracted_information = result else: - workflow_run_response.observer_cruise.summary - workflow_run_response.observer_cruise.output - summary = f"{('summary:' + workflow_run_response.observer_cruise.summary) if workflow_run_response.observer_cruise.summary else ''}" - output = f"{('output: ' + json.dumps(workflow_run_response.observer_cruise.output)) if workflow_run_response.observer_cruise.output else ''}" + workflow_run_response.observer_task.summary + workflow_run_response.observer_task.output + summary = f"{('summary:' + workflow_run_response.observer_task.summary) if workflow_run_response.observer_task.summary else ''}" + output = f"{('output: ' + json.dumps(workflow_run_response.observer_task.output)) if workflow_run_response.observer_task.output else ''}" extracted_information = "" if summary: extracted_information = summary diff --git a/evaluation/script/create_webvoyager_evaluation_result.py b/evaluation/script/create_webvoyager_evaluation_result.py index 7f9e736c..b5134386 100644 --- a/evaluation/script/create_webvoyager_evaluation_result.py +++ b/evaluation/script/create_webvoyager_evaluation_result.py @@ -48,8 +48,8 @@ def main( { "workflow_permanent_id": workflow_pid, "status": str(workflow_run_response.status), - "summary": workflow_run_response.observer_cruise.summary, - "output": workflow_run_response.observer_cruise.output, + "summary": workflow_run_response.observer_task.summary, + "output": workflow_run_response.observer_task.output, "assertion": workflow_run_response.status == WorkflowRunStatus.completed, "failure_reason": workflow_run_response.failure_reason or "", } diff --git a/evaluation/script/create_webvoyager_observer.py b/evaluation/script/create_webvoyager_task_v2.py similarity index 93% rename from evaluation/script/create_webvoyager_observer.py rename to evaluation/script/create_webvoyager_task_v2.py index f17db51f..53d7345d 100644 --- a/evaluation/script/create_webvoyager_observer.py +++ b/evaluation/script/create_webvoyager_task_v2.py @@ -12,7 +12,7 @@ from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.schemas.task_v2 import ObserverTaskRequest -async def create_observer_cruise( +async def create_task_v2( base_url: str, cred: str, ) -> None: @@ -42,7 +42,7 @@ async def create_observer_cruise( dumped_data = case_data.model_dump() dumped_data.update( { - "observer_cruise_id": cruise.observer_cruise_id, + "task_v2_id": cruise.observer_cruise_id, "workflow_run_id": cruise.workflow_run_id, "workflow_permanent_id": cruise.workflow_permanent_id, "cruise_url": str(cruise.url) if cruise.url else cruise.url, @@ -59,7 +59,7 @@ def main( base_url: str = typer.Option(..., "--base-url", help="base url for Skyvern client"), cred: str = typer.Option(..., "--cred", help="credential for Skyvern organization"), ) -> None: - asyncio.run(create_observer_cruise(base_url=base_url, cred=cred)) + asyncio.run(create_task_v2(base_url=base_url, cred=cred)) if __name__ == "__main__": diff --git a/evaluation/script/eval_webvoyager_cruise.py b/evaluation/script/eval_webvoyager_cruise.py index e2060e18..f5084125 100644 --- a/evaluation/script/eval_webvoyager_cruise.py +++ b/evaluation/script/eval_webvoyager_cruise.py @@ -33,8 +33,8 @@ async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> d one_record.update( { "status": str(workflow_run_response.status), - "summary": workflow_run_response.observer_cruise.summary, - "output": workflow_run_response.observer_cruise.output, + "summary": workflow_run_response.observer_task.summary, + "output": workflow_run_response.observer_task.output, } ) if workflow_run_response.status != WorkflowRunStatus.completed: diff --git a/skyvern/agent/local.py b/skyvern/agent/local.py index dcb929d3..ea4eb919 100644 --- a/skyvern/agent/local.py +++ b/skyvern/agent/local.py @@ -66,23 +66,23 @@ class Agent: api_key=org_auth_token.token if org_auth_token else None, ) - async def _run_observer_task(self, organization: Organization, observer_task: ObserverTask) -> None: + async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None: # mark observer cruise as queued - await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_task.observer_cruise_id, + await app.DATABASE.update_task_v2( + task_v2_id=task_v2.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization.organization_id, ) - assert observer_task.workflow_run_id + assert task_v2.workflow_run_id await app.DATABASE.update_workflow_run( - workflow_run_id=observer_task.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, status=WorkflowRunStatus.queued, ) await task_v2_service.run_observer_task( organization=organization, - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, ) async def create_task( @@ -170,12 +170,12 @@ class Agent: if not observer_task.workflow_run_id: raise Exception("Observer cruise missing workflow run id") - asyncio.create_task(self._run_observer_task(organization, observer_task)) + asyncio.create_task(self._run_task_v2(organization, observer_task)) return observer_task async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None: organization = await self._get_organization() - return await app.DATABASE.get_observer_cruise(task_id, organization.organization_id) + return await app.DATABASE.get_task_v2(task_id, organization.organization_id) async def run_observer_task_v_2( self, task_request: ObserverTaskRequest, timeout_seconds: int = 600 diff --git a/skyvern/client/agent/client.py b/skyvern/client/agent/client.py index c56373c9..bac66354 100644 --- a/skyvern/client/agent/client.py +++ b/skyvern/client/agent/client.py @@ -3115,7 +3115,7 @@ class AgentClient: publish_workflow=publish_workflow, request_options=request_options, ) - observer_cruise_id = observer_task.get("task_id") + task_id = observer_task.get("task_id") start_time = time.time() while True: @@ -3123,7 +3123,7 @@ class AgentClient: raise TimeoutError(f"Task timed out after {timeout_seconds} seconds") task = self.get_observer_task_v_2( - str(observer_cruise_id), api_key=api_key, authorization=authorization, request_options=request_options + str(task_id), api_key=api_key, authorization=authorization, request_options=request_options ) if str(task.get("status")) in ["timed_out", "failed", "terminated", "completed", "canceled"]: return task @@ -7358,11 +7358,11 @@ class AsyncAgentClient: publish_workflow=publish_workflow, request_options=request_options, ) - observer_cruise_id = observer_task.get("task_id") + task_id = observer_task.get("task_id") async with asyncio.timeout(timeout_seconds): while True: task = await self.get_observer_task_v_2( - str(observer_cruise_id), + str(task_id), api_key=api_key, authorization=authorization, request_options=request_options, diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 8aa74efc..a009cd20 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -31,13 +31,13 @@ class FailedToSendWebhook(SkyvernException): task_id: str | None = None, workflow_run_id: str | None = None, workflow_id: str | None = None, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, ): workflow_run_str = f"workflow_run_id={workflow_run_id}" if workflow_run_id else "" workflow_str = f"workflow_id={workflow_id}" if workflow_id else "" task_str = f"task_id={task_id}" if task_id else "" - observer_cruise_str = f"observer_cruise_id={observer_cruise_id}" if observer_cruise_id else "" - super().__init__(f"Failed to send webhook. {workflow_run_str} {workflow_str} {task_str} {observer_cruise_str}") + task_v2_str = f"task_v2_id={task_v2_id}" if task_v2_id else "" + super().__init__(f"Failed to send webhook. {workflow_run_str} {workflow_str} {task_str} {task_v2_str}") class ProxyLocationNotSupportedError(SkyvernException): @@ -632,9 +632,9 @@ class UrlGenerationFailure(SkyvernHTTPException): super().__init__("Failed to generate the url for the prompt") -class ObserverCruiseNotFound(SkyvernHTTPException): - def __init__(self, observer_cruise_id: str) -> None: - super().__init__(f"Observer task {observer_cruise_id} not found") +class TaskV2NotFound(SkyvernHTTPException): + def __init__(self, task_v2_id: str) -> None: + super().__init__(f"Task v2 {task_v2_id} not found") class NoTOTPVerificationCodeFound(SkyvernHTTPException): diff --git a/skyvern/forge/sdk/api/llm/api_handler_factory.py b/skyvern/forge/sdk/api/llm/api_handler_factory.py index 8207e1c0..d2eef027 100644 --- a/skyvern/forge/sdk/api/llm/api_handler_factory.py +++ b/skyvern/forge/sdk/api/llm/api_handler_factory.py @@ -63,7 +63,7 @@ class LLMAPIHandlerFactory: prompt: str, prompt_name: str, step: Step | None = None, - observer_cruise: ObserverTask | None = None, + task_v2: ObserverTask | None = None, observer_thought: ObserverThought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, @@ -92,7 +92,7 @@ class LLMAPIHandlerFactory: data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"), artifact_type=ArtifactType.HASHED_HREF_MAP, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -102,7 +102,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_PROMPT, screenshots=screenshots, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ) messages = await llm_messages_builder(prompt, screenshots, llm_config.add_assistant_prefix) @@ -117,7 +117,7 @@ class LLMAPIHandlerFactory: ).encode("utf-8"), artifact_type=ArtifactType.LLM_REQUEST, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -144,7 +144,7 @@ class LLMAPIHandlerFactory: data=response.model_dump_json(indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -184,7 +184,7 @@ class LLMAPIHandlerFactory: data=json.dumps(parsed_response, indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE_PARSED, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -197,7 +197,7 @@ class LLMAPIHandlerFactory: data=json.dumps(parsed_response, indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -234,7 +234,7 @@ class LLMAPIHandlerFactory: prompt: str, prompt_name: str, step: Step | None = None, - observer_cruise: ObserverTask | None = None, + task_v2: ObserverTask | None = None, observer_thought: ObserverThought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, @@ -255,7 +255,7 @@ class LLMAPIHandlerFactory: data=json.dumps(context.hashed_href_map, indent=2).encode("utf-8"), artifact_type=ArtifactType.HASHED_HREF_MAP, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -265,7 +265,7 @@ class LLMAPIHandlerFactory: artifact_type=ArtifactType.LLM_PROMPT, screenshots=screenshots, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -285,7 +285,7 @@ class LLMAPIHandlerFactory: ).encode("utf-8"), artifact_type=ArtifactType.LLM_REQUEST, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -319,7 +319,7 @@ class LLMAPIHandlerFactory: data=response.model_dump_json(indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -354,7 +354,7 @@ class LLMAPIHandlerFactory: data=json.dumps(parsed_response, indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE_PARSED, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) @@ -367,7 +367,7 @@ class LLMAPIHandlerFactory: data=json.dumps(parsed_response, indent=2).encode("utf-8"), artifact_type=ArtifactType.LLM_RESPONSE_RENDERED, step=step, - observer_cruise=observer_cruise, + task_v2=task_v2, observer_thought=observer_thought, ai_suggestion=ai_suggestion, ) diff --git a/skyvern/forge/sdk/api/llm/models.py b/skyvern/forge/sdk/api/llm/models.py index ac407b10..4d3a50f9 100644 --- a/skyvern/forge/sdk/api/llm/models.py +++ b/skyvern/forge/sdk/api/llm/models.py @@ -85,7 +85,7 @@ class LLMAPIHandler(Protocol): prompt: str, prompt_name: str, step: Step | None = None, - observer_cruise: ObserverTask | None = None, + task_v2: ObserverTask | None = None, observer_thought: ObserverThought | None = None, ai_suggestion: AISuggestion | None = None, screenshots: list[bytes] | None = None, diff --git a/skyvern/forge/sdk/artifact/manager.py b/skyvern/forge/sdk/artifact/manager.py index 223b0335..af26798f 100644 --- a/skyvern/forge/sdk/artifact/manager.py +++ b/skyvern/forge/sdk/artifact/manager.py @@ -30,7 +30,7 @@ class ArtifactManager: workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, observer_thought_id: str | None = None, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, ai_suggestion_id: str | None = None, organization_id: str | None = None, data: bytes | None = None, @@ -49,7 +49,7 @@ class ArtifactManager: workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, observer_thought_id=observer_thought_id, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, organization_id=organization_id, ai_suggestion_id=ai_suggestion_id, ) @@ -129,28 +129,28 @@ class ArtifactManager: artifact_type=artifact_type, uri=uri, observer_thought_id=observer_thought.observer_thought_id, - observer_cruise_id=observer_thought.observer_cruise_id, + task_v2_id=observer_thought.observer_cruise_id, organization_id=observer_thought.organization_id, data=data, path=path, ) - async def create_observer_cruise_artifact( + async def create_task_v2_artifact( self, - observer_cruise: ObserverTask, + task_v2: ObserverTask, artifact_type: ArtifactType, data: bytes | None = None, path: str | None = None, ) -> str: artifact_id = generate_artifact_id() - uri = app.STORAGE.build_observer_cruise_uri(artifact_id, observer_cruise, artifact_type) + uri = app.STORAGE.build_task_v2_uri(artifact_id, task_v2, artifact_type) return await self._create_artifact( - aio_task_primary_key=observer_cruise.observer_cruise_id, + aio_task_primary_key=task_v2.observer_cruise_id, artifact_id=artifact_id, artifact_type=artifact_type, uri=uri, - observer_cruise_id=observer_cruise.observer_cruise_id, - organization_id=observer_cruise.organization_id, + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, data=data, path=path, ) @@ -203,7 +203,7 @@ class ArtifactManager: screenshots: list[bytes] | None = None, step: Step | None = None, observer_thought: ObserverThought | None = None, - observer_cruise: ObserverTask | None = None, + task_v2: ObserverTask | None = None, ai_suggestion: AISuggestion | None = None, ) -> None: if step: @@ -218,15 +218,15 @@ class ArtifactManager: artifact_type=ArtifactType.SCREENSHOT_LLM, data=screenshot, ) - elif observer_cruise: - await self.create_observer_cruise_artifact( - observer_cruise=observer_cruise, + elif task_v2: + await self.create_task_v2_artifact( + task_v2=task_v2, artifact_type=artifact_type, data=data, ) for screenshot in screenshots or []: - await self.create_observer_cruise_artifact( - observer_cruise=observer_cruise, + await self.create_task_v2_artifact( + task_v2=task_v2, artifact_type=ArtifactType.SCREENSHOT_LLM, data=screenshot, ) diff --git a/skyvern/forge/sdk/artifact/storage/base.py b/skyvern/forge/sdk/artifact/storage/base.py index 7c01cbe6..56131cc0 100644 --- a/skyvern/forge/sdk/artifact/storage/base.py +++ b/skyvern/forge/sdk/artifact/storage/base.py @@ -55,9 +55,7 @@ class BaseStorage(ABC): pass @abstractmethod - def build_observer_cruise_uri( - self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType - ) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: pass @abstractmethod diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index f9f0358d..ebb68e6c 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -46,11 +46,9 @@ class LocalStorage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" - def build_observer_cruise_uri( - self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType - ) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] - return f"file://{self.artifact_path}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + return f"file://{self.artifact_path}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" def build_workflow_run_block_uri( self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType diff --git a/skyvern/forge/sdk/artifact/storage/s3.py b/skyvern/forge/sdk/artifact/storage/s3.py index 1a5987cf..ccf9a4a8 100644 --- a/skyvern/forge/sdk/artifact/storage/s3.py +++ b/skyvern/forge/sdk/artifact/storage/s3.py @@ -46,11 +46,9 @@ class S3Storage(BaseStorage): file_ext = FILE_EXTENTSION_MAP[artifact_type] return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_thought.observer_cruise_id}/{observer_thought.observer_thought_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" - def build_observer_cruise_uri( - self, artifact_id: str, observer_cruise: ObserverTask, artifact_type: ArtifactType - ) -> str: + def build_task_v2_uri(self, artifact_id: str, task_v2: ObserverTask, artifact_type: ArtifactType) -> str: file_ext = FILE_EXTENTSION_MAP[artifact_type] - return f"s3://{self.bucket}/{settings.ENV}/observers/{observer_cruise.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" + return f"s3://{self.bucket}/{settings.ENV}/observers/{task_v2.observer_cruise_id}/{datetime.utcnow().isoformat()}_{artifact_id}_{artifact_type}.{file_ext}" def build_workflow_run_block_uri( self, artifact_id: str, workflow_run_block: WorkflowRunBlock, artifact_type: ArtifactType diff --git a/skyvern/forge/sdk/core/skyvern_context.py b/skyvern/forge/sdk/core/skyvern_context.py index 02ca947e..4f6a9210 100644 --- a/skyvern/forge/sdk/core/skyvern_context.py +++ b/skyvern/forge/sdk/core/skyvern_context.py @@ -12,7 +12,7 @@ class SkyvernContext: task_id: str | None = None workflow_id: str | None = None workflow_run_id: str | None = None - observer_cruise_id: str | None = None + task_v2_id: str | None = None max_steps_override: int | None = None tz_info: ZoneInfo | None = None totp_codes: dict[str, str | None] = field(default_factory=dict) @@ -22,7 +22,7 @@ class SkyvernContext: frame_index_map: dict[Frame, int] = field(default_factory=dict) def __repr__(self) -> str: - return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, max_steps_override={self.max_steps_override})" + return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override})" def __str__(self) -> str: return self.__repr__() diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 3128044c..19116348 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -210,7 +210,7 @@ class AgentDB: task_id: str | None = None, workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, observer_thought_id: str | None = None, ai_suggestion_id: str | None = None, organization_id: str | None = None, @@ -225,7 +225,7 @@ class AgentDB: step_id=step_id, workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, - observer_cruise_id=observer_cruise_id, + observer_cruise_id=task_v2_id, observer_thought_id=observer_thought_id, ai_suggestion_id=ai_suggestion_id, organization_id=organization_id, @@ -807,9 +807,9 @@ class AgentDB: return convert_to_organization_auth_token(auth_token) - async def get_artifacts_for_observer_cruise( + async def get_artifacts_for_task_v2( self, - observer_cruise_id: str, + task_v2_id: str, organization_id: str | None = None, artifact_types: list[ArtifactType] | None = None, ) -> list[Artifact]: @@ -817,7 +817,7 @@ class AgentDB: async with self.Session() as session: query = ( select(ArtifactModel) - .filter_by(observer_cruise_id=observer_cruise_id) + .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) ) if artifact_types: @@ -894,7 +894,7 @@ class AgentDB: workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, observer_thought_id: str | None = None, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, organization_id: str | None = None, ) -> list[Artifact]: try: @@ -913,8 +913,8 @@ class AgentDB: query = query.filter_by(workflow_run_block_id=workflow_run_block_id) if observer_thought_id is not None: query = query.filter_by(observer_thought_id=observer_thought_id) - if observer_cruise_id is not None: - query = query.filter_by(observer_cruise_id=observer_cruise_id) + if task_v2_id is not None: + query = query.filter_by(observer_cruise_id=task_v2_id) if organization_id is not None: query = query.filter_by(organization_id=organization_id) @@ -938,7 +938,7 @@ class AgentDB: workflow_run_id: str | None = None, workflow_run_block_id: str | None = None, observer_thought_id: str | None = None, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, organization_id: str | None = None, ) -> Artifact | None: artifacts = await self.get_artifacts_by_entity_id( @@ -948,7 +948,7 @@ class AgentDB: workflow_run_id=workflow_run_id, workflow_run_block_id=workflow_run_block_id, observer_thought_id=observer_thought_id, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, organization_id=organization_id, ) return artifacts[0] if artifacts else None @@ -1915,13 +1915,11 @@ class AgentDB: await session.execute(stmt) await session.commit() - async def delete_observer_cruise_artifacts( - self, observer_cruise_id: str, organization_id: str | None = None - ) -> None: + async def delete_task_v2_artifacts(self, task_v2_id: str, organization_id: str | None = None) -> None: async with self.Session() as session: stmt = delete(ArtifactModel).where( and_( - ArtifactModel.observer_cruise_id == observer_cruise_id, + ArtifactModel.observer_cruise_id == task_v2_id, ArtifactModel.organization_id == organization_id, ) ) @@ -2130,47 +2128,43 @@ class AgentDB: await session.execute(stmt) await session.commit() - async def get_observer_cruise( - self, observer_cruise_id: str, organization_id: str | None = None - ) -> ObserverTask | None: + async def get_task_v2(self, task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None: async with self.Session() as session: - if observer_cruise := ( + if task_v2 := ( await session.scalars( select(ObserverCruiseModel) - .filter_by(observer_cruise_id=observer_cruise_id) + .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) ) ).first(): - return ObserverTask.model_validate(observer_cruise) + return ObserverTask.model_validate(task_v2) return None - async def delete_observer_thoughts_for_cruise( - self, observer_cruise_id: str, organization_id: str | None = None - ) -> None: + async def delete_observer_thoughts_for_cruise(self, task_v2_id: str, organization_id: str | None = None) -> None: async with self.Session() as session: stmt = delete(ObserverThoughtModel).where( and_( - ObserverThoughtModel.observer_cruise_id == observer_cruise_id, + ObserverThoughtModel.observer_cruise_id == task_v2_id, ObserverThoughtModel.organization_id == organization_id, ) ) await session.execute(stmt) await session.commit() - async def get_observer_cruise_by_workflow_run_id( + async def get_task_v2_by_workflow_run_id( self, workflow_run_id: str, organization_id: str | None = None, ) -> ObserverTask | None: async with self.Session() as session: - if observer_cruise := ( + if task_v2 := ( await session.scalars( select(ObserverCruiseModel) .filter_by(organization_id=organization_id) .filter_by(workflow_run_id=workflow_run_id) ) ).first(): - return ObserverTask.model_validate(observer_cruise) + return ObserverTask.model_validate(task_v2) return None async def get_observer_thought( @@ -2189,14 +2183,14 @@ class AgentDB: async def get_observer_thoughts( self, - observer_cruise_id: str, + task_v2_id: str, observer_thought_types: list[ObserverThoughtType] | None = None, organization_id: str | None = None, ) -> list[ObserverThought]: async with self.Session() as session: query = ( select(ObserverThoughtModel) - .filter_by(observer_cruise_id=observer_cruise_id) + .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) .order_by(ObserverThoughtModel.created_at) ) @@ -2205,7 +2199,7 @@ class AgentDB: observer_thoughts = (await session.scalars(query)).all() return [ObserverThought.model_validate(thought) for thought in observer_thoughts] - async def create_observer_cruise( + async def create_task_v2( self, workflow_run_id: str | None = None, workflow_id: str | None = None, @@ -2219,7 +2213,7 @@ class AgentDB: webhook_callback_url: str | None = None, ) -> ObserverTask: async with self.Session() as session: - new_observer_cruise = ObserverCruiseModel( + new_task_v2 = ObserverCruiseModel( workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, @@ -2231,14 +2225,14 @@ class AgentDB: webhook_callback_url=webhook_callback_url, organization_id=organization_id, ) - session.add(new_observer_cruise) + session.add(new_task_v2) await session.commit() - await session.refresh(new_observer_cruise) - return ObserverTask.model_validate(new_observer_cruise) + await session.refresh(new_task_v2) + return ObserverTask.model_validate(new_task_v2) async def create_observer_thought( self, - observer_cruise_id: str, + task_v2_id: str, workflow_run_id: str | None = None, workflow_id: str | None = None, workflow_permanent_id: str | None = None, @@ -2257,7 +2251,7 @@ class AgentDB: ) -> ObserverThought: async with self.Session() as session: new_observer_thought = ObserverThoughtModel( - observer_cruise_id=observer_cruise_id, + observer_cruise_id=task_v2_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, @@ -2331,9 +2325,9 @@ class AgentDB: return ObserverThought.model_validate(observer_thought) raise NotFoundError(f"ObserverThought {observer_thought_id}") - async def update_observer_cruise( + async def update_task_v2( self, - observer_cruise_id: str, + task_v2_id: str, status: ObserverTaskStatus | None = None, workflow_run_id: str | None = None, workflow_id: str | None = None, @@ -2345,34 +2339,34 @@ class AgentDB: organization_id: str | None = None, ) -> ObserverTask: async with self.Session() as session: - observer_cruise = ( + task_v2 = ( await session.scalars( select(ObserverCruiseModel) - .filter_by(observer_cruise_id=observer_cruise_id) + .filter_by(observer_cruise_id=task_v2_id) .filter_by(organization_id=organization_id) ) ).first() - if observer_cruise: + if task_v2: if status: - observer_cruise.status = status + task_v2.status = status if workflow_run_id: - observer_cruise.workflow_run_id = workflow_run_id + task_v2.workflow_run_id = workflow_run_id if workflow_id: - observer_cruise.workflow_id = workflow_id + task_v2.workflow_id = workflow_id if workflow_permanent_id: - observer_cruise.workflow_permanent_id = workflow_permanent_id + task_v2.workflow_permanent_id = workflow_permanent_id if url: - observer_cruise.url = url + task_v2.url = url if prompt: - observer_cruise.prompt = prompt + task_v2.prompt = prompt if summary: - observer_cruise.summary = summary + task_v2.summary = summary if output: - observer_cruise.output = output + task_v2.output = output await session.commit() - await session.refresh(observer_cruise) - return ObserverTask.model_validate(observer_cruise) - raise NotFoundError(f"ObserverTask {observer_cruise_id} not found") + await session.refresh(task_v2) + return ObserverTask.model_validate(task_v2) + raise NotFoundError(f"TaskV2 {task_v2_id} not found") async def create_workflow_run_block( self, diff --git a/skyvern/forge/sdk/db/id.py b/skyvern/forge/sdk/db/id.py index 54864d45..42409a8d 100644 --- a/skyvern/forge/sdk/db/id.py +++ b/skyvern/forge/sdk/db/id.py @@ -37,7 +37,7 @@ BITWARDEN_SENSITIVE_INFORMATION_PARAMETER_PREFIX = "bsi" CREDENTIAL_PARAMETER_PREFIX = "cp" CREDENTIAL_PREFIX = "cred" ORGANIZATION_BITWARDEN_COLLECTION_PREFIX = "obc" -OBSERVER_CRUISE_ID = "oc" +TASK_V2_ID = "oc" OBSERVER_THOUGHT_ID = "ot" ORGANIZATION_AUTH_TOKEN_PREFIX = "oat" ORG_PREFIX = "o" @@ -156,9 +156,9 @@ def generate_action_id() -> str: return f"{ACTION_PREFIX}_{int_id}" -def generate_observer_cruise_id() -> str: +def generate_task_v2_id() -> str: int_id = generate_id() - return f"{OBSERVER_CRUISE_ID}_{int_id}" + return f"{TASK_V2_ID}_{int_id}" def generate_observer_thought_id() -> str: diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index f2b914be..d980b253 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -29,7 +29,6 @@ from skyvern.forge.sdk.db.id import ( generate_bitwarden_sensitive_information_parameter_id, generate_credential_id, generate_credential_parameter_id, - generate_observer_cruise_id, generate_observer_thought_id, generate_org_id, generate_organization_auth_token_id, @@ -40,6 +39,7 @@ from skyvern.forge.sdk.db.id import ( generate_task_generation_id, generate_task_id, generate_task_run_id, + generate_task_v2_id, generate_totp_code_id, generate_workflow_id, generate_workflow_parameter_id, @@ -569,7 +569,8 @@ class ObserverCruiseModel(Base): __tablename__ = "observer_cruises" __table_args__ = (Index("oc_org_wfr_index", "organization_id", "workflow_run_id"),) - observer_cruise_id = Column(String, primary_key=True, default=generate_observer_cruise_id) + # observer_cruise_id is the task_id for task v2 + observer_cruise_id = Column(String, primary_key=True, default=generate_task_v2_id) status = Column(String, nullable=False, default="created") organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=True) workflow_run_id = Column(String, ForeignKey("workflow_runs.workflow_run_id"), nullable=True) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index cc60d499..1fe0dbfd 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -46,12 +46,12 @@ class AsyncExecutor(abc.ABC): pass @abc.abstractmethod - async def execute_cruise( + async def execute_task_v2( self, request: Request | None, background_tasks: BackgroundTasks | None, organization_id: str, - observer_cruise_id: str, + task_v2_id: str, max_iterations_override: int | str | None, browser_session_id: str | None, **kwargs: dict, @@ -138,39 +138,37 @@ class BackgroundTaskExecutor(AsyncExecutor): browser_session_id=browser_session_id, ) - async def execute_cruise( + async def execute_task_v2( self, request: Request | None, background_tasks: BackgroundTasks | None, organization_id: str, - observer_cruise_id: str, + task_v2_id: str, max_iterations_override: int | str | None, browser_session_id: str | None, **kwargs: dict, ) -> None: LOG.info( "Executing cruise using background task executor", - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, ) organization = await app.DATABASE.get_organization(organization_id) if organization is None: raise OrganizationNotFound(organization_id) - observer_cruise = await app.DATABASE.get_observer_cruise( - observer_cruise_id=observer_cruise_id, organization_id=organization_id - ) - if not observer_cruise or not observer_cruise.workflow_run_id: + task_v2 = await app.DATABASE.get_task_v2(task_v2_id=task_v2_id, organization_id=organization_id) + if not task_v2 or not task_v2.workflow_run_id: raise ValueError("No observer cruise or no workflow run associated with observer cruise") # mark observer cruise as queued - await app.DATABASE.update_observer_cruise( - observer_cruise_id, + await app.DATABASE.update_task_v2( + task_v2_id=task_v2_id, status=ObserverTaskStatus.queued, organization_id=organization_id, ) await app.DATABASE.update_workflow_run( - workflow_run_id=observer_cruise.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, status=WorkflowRunStatus.queued, ) @@ -178,7 +176,7 @@ class BackgroundTaskExecutor(AsyncExecutor): background_tasks.add_task( task_v2_service.run_observer_task, organization=organization, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, max_iterations_override=max_iterations_override, browser_session_id=browser_session_id, ) diff --git a/skyvern/forge/sdk/forge_log.py b/skyvern/forge/sdk/forge_log.py index ac6e72a2..95ace7a8 100644 --- a/skyvern/forge/sdk/forge_log.py +++ b/skyvern/forge/sdk/forge_log.py @@ -32,8 +32,8 @@ def add_kv_pairs_to_msg(logger: logging.Logger, method_name: str, event_dict: Ev event_dict["workflow_id"] = context.workflow_id if context.workflow_run_id: event_dict["workflow_run_id"] = context.workflow_run_id - if context.observer_cruise_id: - event_dict["observer_cruise_id"] = context.observer_cruise_id + if context.task_v2_id: + event_dict["task_v2_id"] = context.task_v2_id # Add env to the log event_dict["env"] = settings.ENV diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index d9dce961..0bd50574 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -799,12 +799,12 @@ async def get_workflow_run( include_cost=True, ) return_dict = workflow_run_status_response.model_dump() - observer_cruise = await app.DATABASE.get_observer_cruise_by_workflow_run_id( + task_v2 = await app.DATABASE.get_task_v2_by_workflow_run_id( workflow_run_id=workflow_run_id, organization_id=current_org.organization_id, ) - if observer_cruise: - return_dict["observer_task"] = observer_cruise.model_dump(by_alias=True) + if task_v2: + return_dict["observer_task"] = task_v2.model_dump(by_alias=True) return return_dict @@ -1251,11 +1251,11 @@ async def observer_task( status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later." ) analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_task.url}) - await AsyncExecutorFactory.get_executor().execute_cruise( + await AsyncExecutorFactory.get_executor().execute_task_v2( request=request, background_tasks=background_tasks, organization_id=organization.organization_id, - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, max_iterations_override=x_max_iterations_override, browser_session_id=data.browser_session_id, ) @@ -1268,10 +1268,10 @@ async def get_observer_task( task_id: str, organization: Organization = Depends(org_auth_service.get_current_org), ) -> dict[str, Any]: - observer_task = await task_v2_service.get_observer_cruise(task_id, organization.organization_id) - if not observer_task: - raise HTTPException(status_code=404, detail=f"Observer task {task_id} not found") - return observer_task.model_dump(by_alias=True) + task_v2 = await task_v2_service.get_task_v2(task_id, organization.organization_id) + if not task_v2: + raise HTTPException(status_code=404, detail=f"Task v2 {task_id} not found") + return task_v2.model_dump(by_alias=True) @base_router.get( @@ -1374,7 +1374,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: """ # get observer task by workflow run id - observer_task_obj = await app.DATABASE.get_observer_cruise_by_workflow_run_id( + task_v2_obj = await app.DATABASE.get_task_v2_by_workflow_run_id( workflow_run_id=workflow_run_id, organization_id=organization_id, ) @@ -1397,7 +1397,7 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: "Block workflow run id is not set for task_v2 block", workflow_run_id=workflow_run_id, organization_id=organization_id, - observer_cruise_id=observer_task_obj.observer_cruise_id if observer_task_obj else None, + task_v2_id=task_v2_obj.observer_cruise_id if task_v2_obj else None, ) continue # in the future if we want to nested taskv2 shows up as a nested block, we should not flatten the timeline @@ -1407,9 +1407,9 @@ async def _flatten_workflow_run_timeline(organization_id: str, workflow_run_id: ) final_workflow_run_block_timeline.extend(workflow_blocks) - if observer_task_obj and observer_task_obj.observer_cruise_id: + if task_v2_obj and task_v2_obj.observer_cruise_id: observer_thought_timeline = await task_v2_service.get_observer_thought_timelines( - observer_cruise_id=observer_task_obj.observer_cruise_id, + task_v2_id=task_v2_obj.observer_cruise_id, organization_id=organization_id, ) final_workflow_run_block_timeline.extend(observer_thought_timeline) diff --git a/skyvern/forge/sdk/services/task_v2_service.py b/skyvern/forge/sdk/services/task_v2_service.py index cd534f31..2e00344f 100644 --- a/skyvern/forge/sdk/services/task_v2_service.py +++ b/skyvern/forge/sdk/services/task_v2_service.py @@ -8,7 +8,7 @@ import httpx import structlog from sqlalchemy.exc import OperationalError -from skyvern.exceptions import FailedToSendWebhook, ObserverCruiseNotFound, TaskTerminationError, UrlGenerationFailure +from skyvern.exceptions import FailedToSendWebhook, TaskTerminationError, TaskV2NotFound, UrlGenerationFailure from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.artifact.models import ArtifactType @@ -101,7 +101,7 @@ async def initialize_observer_task( parent_workflow_run_id: str | None = None, create_task_run: bool = False, ) -> ObserverTask: - observer_task = await app.DATABASE.create_observer_cruise( + task_v2 = await app.DATABASE.create_task_v2( prompt=user_prompt, organization_id=organization.organization_id, totp_verification_url=totp_verification_url, @@ -112,10 +112,10 @@ async def initialize_observer_task( # set observer cruise id in context context = skyvern_context.current() if context: - context.observer_cruise_id = observer_task.observer_cruise_id + context.task_v2_id = task_v2.observer_cruise_id observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, organization_id=organization.organization_id, observer_thought_type=ObserverThoughtType.metadata, observer_thought_scenario=ObserverThoughtScenario.generate_metadata, @@ -164,8 +164,8 @@ async def initialize_observer_task( LOG.error("Failed to setup cruise workflow run", exc_info=True) # fail the workflow run await mark_observer_task_as_failed( - observer_cruise_id=observer_task.observer_cruise_id, - workflow_run_id=observer_task.workflow_run_id, + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=task_v2.workflow_run_id, failure_reason="Skyvern failed to setup the workflow run", organization_id=organization.organization_id, ) @@ -186,8 +186,8 @@ async def initialize_observer_task( # update oserver cruise try: - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_task.observer_cruise_id, + task_v2 = await app.DATABASE.update_task_v2( + task_v2_id=task_v2.observer_cruise_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=new_workflow.workflow_id, workflow_permanent_id=new_workflow.workflow_permanent_id, @@ -198,7 +198,7 @@ async def initialize_observer_task( await app.DATABASE.create_task_run( task_run_type=TaskRunType.task_v2, organization_id=organization.organization_id, - run_id=observer_task.observer_cruise_id, + run_id=task_v2.observer_cruise_id, title=new_workflow.title, url=url, url_hash=generate_url_hash(url), @@ -207,41 +207,41 @@ async def initialize_observer_task( LOG.warning("Failed to update task 2.0", exc_info=True) # fail the workflow run await mark_observer_task_as_failed( - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, workflow_run_id=workflow_run.workflow_run_id, failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run", organization_id=organization.organization_id, ) raise - return observer_task + return task_v2 async def run_observer_task( organization: Organization, - observer_cruise_id: str, + task_v2_id: str, request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, ) -> ObserverTask: organization_id = organization.organization_id try: - observer_task = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + observer_task = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) except Exception: LOG.error( "Failed to get observer task", - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, organization_id=organization_id, exc_info=True, ) return await mark_observer_task_as_failed( - observer_cruise_id, + task_v2_id, organization_id=organization_id, failure_reason="Failed to get task v2", ) if not observer_task: - LOG.error("Task v2 not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) - raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id) + LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id) + raise TaskV2NotFound(task_v2_id=task_v2_id) workflow, workflow_run = None, None try: @@ -254,17 +254,17 @@ async def run_observer_task( ) except TaskTerminationError as e: observer_task = await mark_observer_task_as_terminated( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=observer_task.workflow_run_id, organization_id=organization_id, failure_reason=e.message, ) - LOG.info("Task v2 is terminated", observer_cruise_id=observer_cruise_id, failure_reason=e.message) + LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message) return observer_task except OperationalError: LOG.error("Database error when running observer cruise", exc_info=True) observer_task = await mark_observer_task_as_failed( - observer_cruise_id, + task_v2_id, workflow_run_id=observer_task.workflow_run_id, failure_reason="Database error when running task 2.0", organization_id=organization_id, @@ -273,7 +273,7 @@ async def run_observer_task( LOG.error("Failed to run observer cruise", exc_info=True) failure_reason = f"Failed to run task 2.0: {str(e)}" observer_task = await mark_observer_task_as_failed( - observer_cruise_id, + task_v2_id, workflow_run_id=observer_task.workflow_run_id, failure_reason=failure_reason, organization_id=organization_id, @@ -302,26 +302,26 @@ async def run_observer_task_helper( browser_session_id: str | None = None, ) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: organization_id = organization.organization_id - observer_cruise_id = observer_task.observer_cruise_id + task_v2_id = observer_task.observer_cruise_id if observer_task.status != ObserverTaskStatus.queued: LOG.error( - "Observer cruise is not queued. Duplicate observer cruise", - observer_cruise_id=observer_cruise_id, + "Task v2 is not queued. Duplicate task v2", + task_v2_id=task_v2_id, status=observer_task.status, organization_id=organization_id, ) return None, None, observer_task if not observer_task.url or not observer_task.prompt: LOG.error( - "Observer cruise url or prompt not found", - observer_cruise_id=observer_cruise_id, + "Task v2 url or prompt not found", + task_v2_id=task_v2_id, organization_id=organization_id, ) return None, None, observer_task if not observer_task.workflow_run_id: LOG.error( - "Workflow run id not found in observer cruise", - observer_cruise_id=observer_cruise_id, + "Workflow run id not found in task v2", + task_v2_id=task_v2_id, organization_id=organization_id, ) return None, None, observer_task @@ -364,12 +364,12 @@ async def run_observer_task_helper( workflow_id=workflow_id, workflow_run_id=workflow_run_id, request_id=request_id, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, ) ) - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.running + observer_task = await app.DATABASE.update_task_v2( + task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await _set_up_workflow_context(workflow_id, workflow_run_id, organization) @@ -385,7 +385,7 @@ async def run_observer_task_helper( # validate the task execution await app.AGENT_FUNCTION.validate_task_execution( organization_id=organization_id, - task_id=observer_cruise_id, + task_id=task_v2_id, task_version="v2", ) @@ -399,10 +399,10 @@ async def run_observer_task_helper( LOG.info( "Task v2 is canceled. Stopping task v2", workflow_run_id=workflow_run_id, - observer_task_id=observer_cruise_id, + task_v2_id=task_v2_id, ) await mark_observer_task_as_canceled( - observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, organization_id=organization_id, ) @@ -457,7 +457,7 @@ async def run_observer_task_helper( local_datetime=datetime.now(context.tz_info).isoformat(), ) observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, organization_id=organization_id, workflow_run_id=workflow_run.workflow_run_id, workflow_id=workflow.workflow_id, @@ -516,7 +516,7 @@ async def run_observer_task_helper( if not task_type: LOG.error("No task type found in observer response", observer_response=observer_response) await mark_observer_task_as_failed( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason="Skyvern failed to generate a task. Please try again later.", ) @@ -524,7 +524,7 @@ async def run_observer_task_helper( if task_type == "extract": block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( - observer_cruise=observer_task, + task_v2=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -550,7 +550,7 @@ async def run_observer_task_helper( elif task_type == "loop": try: block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( - observer_cruise=observer_task, + task_v2=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -568,7 +568,7 @@ async def run_observer_task_helper( except Exception: LOG.exception("Failed to generate loop task") await mark_observer_task_as_failed( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason="Failed to generate the loop.", ) @@ -576,7 +576,7 @@ async def run_observer_task_helper( else: LOG.info("Unsupported task type", task_type=task_type) await mark_observer_task_as_failed( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason=f"Unsupported task block type gets generated: {task_type}", ) @@ -594,7 +594,7 @@ async def run_observer_task_helper( extracted_data = _get_extracted_data_from_block_result( block_result, task_type, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, ) if extracted_data is not None: @@ -626,7 +626,7 @@ async def run_observer_task_helper( # execute the extraction task workflow_run = await handle_block_result( - observer_cruise_id, + task_v2_id, block, block_result, workflow, @@ -666,7 +666,7 @@ async def run_observer_task_helper( local_datetime=datetime.now(context.tz_info).isoformat(), ) observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, organization_id=organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, @@ -716,7 +716,7 @@ async def run_observer_task_helper( workflow_run_id=workflow_run_id, ) observer_task = await mark_observer_task_as_failed( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, # TODO: add a better failure reason with LLM failure_reason="Max iterations reached", @@ -727,7 +727,7 @@ async def run_observer_task_helper( async def handle_block_result( - observer_cruise_id: str, + task_v2_id: str, block: BlockTypeVar, block_result: BlockResult, workflow: Workflow, @@ -746,7 +746,7 @@ async def handle_block_result( block_label=block.label, ) await mark_observer_task_as_canceled( - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, organization_id=workflow_run.organization_id, ) @@ -815,7 +815,7 @@ async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str, organ async def _generate_loop_task( - observer_cruise: ObserverTask, + task_v2: ObserverTask, workflow_id: str, workflow_permanent_id: str, workflow_run_id: str, @@ -831,8 +831,8 @@ async def _generate_loop_task( ) data_extraction_thought = f"Going to generate a list of values to go through based on the plan: {plan}." observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise.observer_cruise_id, - organization_id=observer_cruise.organization_id, + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, @@ -870,7 +870,7 @@ async def _generate_loop_task( # execute the extraction block extraction_block_result = await extraction_block_for_loop.execute_safe( workflow_run_id=workflow_run_id, - organization_id=observer_cruise.organization_id, + organization_id=task_v2.organization_id, ) LOG.info("Extraction block result", extraction_block_result=extraction_block_result) if extraction_block_result.success is False: @@ -901,7 +901,7 @@ async def _generate_loop_task( # update the observer thought await app.DATABASE.update_observer_thought( observer_thought_id=observer_thought.observer_thought_id, - organization_id=observer_cruise.organization_id, + organization_id=task_v2.organization_id, output=output_value_obj, ) @@ -960,8 +960,8 @@ async def _generate_loop_task( loop_values=loop_values, ) observer_thought_task_in_loop = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise.observer_cruise_id, - organization_id=observer_cruise.organization_id, + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, workflow_run_id=workflow_run_id, workflow_id=workflow_id, workflow_permanent_id=workflow_permanent_id, @@ -981,7 +981,7 @@ async def _generate_loop_task( thought = task_in_loop_metadata_response.get("thoughts") await app.DATABASE.update_observer_thought( observer_thought_id=observer_thought_task_in_loop.observer_thought_id, - organization_id=observer_cruise.organization_id, + organization_id=task_v2.organization_id, thought=thought, output=task_in_loop_metadata_response, ) @@ -1046,7 +1046,7 @@ async def _generate_loop_task( async def _generate_extraction_task( - observer_cruise: ObserverTask, + task_v2: ObserverTask, workflow_id: str, workflow_permanent_id: str, workflow_run_id: str, @@ -1067,7 +1067,7 @@ async def _generate_extraction_task( ) generate_extraction_task_response = await app.LLM_API_HANDLER( generate_extraction_task_prompt, - observer_cruise=observer_cruise, + task_v2=task_v2, prompt_name="task_v2_generate_extraction_task", ) LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response) @@ -1174,11 +1174,11 @@ def _generate_random_string(length: int = 5) -> str: async def get_observer_thought_timelines( - observer_cruise_id: str, + task_v2_id: str, organization_id: str | None = None, ) -> list[WorkflowRunTimeline]: observer_thoughts = await app.DATABASE.get_observer_thoughts( - observer_cruise_id, + task_v2_id, organization_id=organization_id, observer_thought_types=[ ObserverThoughtType.plan, @@ -1196,18 +1196,18 @@ async def get_observer_thought_timelines( ] -async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverTask | None: - return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) +async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> ObserverTask | None: + return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) async def mark_observer_task_as_failed( - observer_cruise_id: str, + task_v2_id: str, workflow_run_id: str | None = None, failure_reason: str | None = None, organization_id: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id, + observer_task = await app.DATABASE.update_task_v2( + task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.failed, ) @@ -1220,14 +1220,14 @@ async def mark_observer_task_as_failed( async def mark_observer_task_as_completed( - observer_cruise_id: str, + task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, summary: str | None = None, output: dict[str, Any] | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id, + observer_task = await app.DATABASE.update_task_v2( + task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.completed, summary=summary, @@ -1240,7 +1240,7 @@ async def mark_observer_task_as_completed( duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds() LOG.info( "Observer task duration metrics", - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, duration_seconds=duration_seconds, observer_task_status=ObserverTaskStatus.completed, @@ -1252,12 +1252,12 @@ async def mark_observer_task_as_completed( async def mark_observer_task_as_canceled( - observer_cruise_id: str, + task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id, + observer_task = await app.DATABASE.update_task_v2( + task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.canceled, ) @@ -1268,13 +1268,13 @@ async def mark_observer_task_as_canceled( async def mark_observer_task_as_terminated( - observer_cruise_id: str, + task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, failure_reason: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id, + observer_task = await app.DATABASE.update_task_v2( + task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.terminated, ) @@ -1287,7 +1287,7 @@ async def mark_observer_task_as_terminated( def _get_extracted_data_from_block_result( block_result: BlockResult, task_type: str, - observer_cruise_id: str | None = None, + task_v2_id: str | None = None, workflow_run_id: str | None = None, ) -> Any | None: """Extract data from block result based on task type. @@ -1295,7 +1295,7 @@ def _get_extracted_data_from_block_result( Args: block_result: The result from block execution task_type: Type of task ("extract" or "loop") - observer_cruise_id: Optional ID for logging + task_v2_id: Optional ID for logging workflow_run_id: Optional ID for logging Returns: @@ -1320,7 +1320,7 @@ def _get_extracted_data_from_block_result( LOG.warning( "Inner loop output is not a list", inner_loop_output=inner_loop_output, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, workflow_run_block_id=block_result.workflow_run_block_id, ) @@ -1330,7 +1330,7 @@ def _get_extracted_data_from_block_result( LOG.warning( "inner output is not a dict", inner_output=inner_output, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, workflow_run_block_id=block_result.workflow_run_block_id, ) @@ -1340,7 +1340,7 @@ def _get_extracted_data_from_block_result( LOG.warning( "output_value is not a dict", output_value=output_value, - observer_cruise_id=observer_cruise_id, + task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, workflow_run_block_id=block_result.workflow_run_block_id, ) @@ -1360,7 +1360,7 @@ async def _summarize_observer_task( screenshots: list[bytes] | None = None, ) -> ObserverTask: observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, organization_id=observer_task.organization_id, workflow_run_id=observer_task.workflow_run_id, workflow_id=observer_task.workflow_id, @@ -1393,7 +1393,7 @@ async def _summarize_observer_task( ) return await mark_observer_task_as_completed( - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, workflow_run_id=observer_task.workflow_run_id, organization_id=observer_task.organization_id, summary=thought, @@ -1414,7 +1414,7 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None: if not api_key: LOG.warning( "No valid API key found for the organization of observer cruise", - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, ) return # build the observer cruise response @@ -1422,7 +1422,7 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None: headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( "Sending observer cruise response to webhook callback url", - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, webhook_callback_url=observer_task.webhook_callback_url, payload=payload, headers=headers, @@ -1434,17 +1434,17 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None: if resp.status_code == 200: LOG.info( "Observer cruise webhook sent successfully", - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, resp_code=resp.status_code, resp_text=resp.text, ) else: LOG.info( "Observer cruise webhook failed", - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, resp=resp, resp_code=resp.status_code, resp_text=resp.text, ) except Exception as e: - raise FailedToSendWebhook(observer_cruise_id=observer_task.observer_cruise_id) from e + raise FailedToSendWebhook(task_v2_id=observer_task.observer_cruise_id) from e diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 356e6ce3..3611b32b 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -2157,7 +2157,7 @@ class TaskV2Block(Block): parent_workflow_run_id=workflow_run_id, proxy_location=workflow_run.proxy_location, ) - await app.DATABASE.update_observer_cruise( + await app.DATABASE.update_task_v2( observer_task.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id ) if observer_task.workflow_run_id: @@ -2173,7 +2173,7 @@ class TaskV2Block(Block): observer_task = await task_v2_service.run_observer_task( organization=organization, - observer_cruise_id=observer_task.observer_cruise_id, + task_v2_id=observer_task.observer_cruise_id, request_id=None, max_iterations_override=self.max_iterations, browser_session_id=browser_session_id,