diff --git a/evaluation/core/__init__.py b/evaluation/core/__init__.py index 80c4b7a4..0f7874b9 100644 --- a/evaluation/core/__init__.py +++ b/evaluation/core/__init__.py @@ -260,7 +260,7 @@ class Evaluator: ) extracted_information: list | dict[str, Any] | str | None = None - if workflow_run_response.observer_task is None: + if workflow_run_response.task_v2 is None: assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, ( f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output" ) @@ -272,10 +272,10 @@ class Evaluator: # FIXME: improve this when the last block is loop block extracted_information = result else: - workflow_run_response.observer_task.summary - workflow_run_response.observer_task.output - summary = f"{('summary:' + workflow_run_response.observer_task.summary) if workflow_run_response.observer_task.summary else ''}" - output = f"{('output: ' + json.dumps(workflow_run_response.observer_task.output)) if workflow_run_response.observer_task.output else ''}" + workflow_run_response.task_v2.summary + workflow_run_response.task_v2.output + summary = f"{('summary:' + workflow_run_response.task_v2.summary) if workflow_run_response.task_v2.summary else ''}" + output = f"{('output: ' + json.dumps(workflow_run_response.task_v2.output)) if workflow_run_response.task_v2.output else ''}" extracted_information = "" if summary: extracted_information = summary diff --git a/evaluation/script/create_webvoyager_evaluation_result.py b/evaluation/script/create_webvoyager_evaluation_result.py index 76078ec7..506a52e0 100644 --- a/evaluation/script/create_webvoyager_evaluation_result.py +++ b/evaluation/script/create_webvoyager_evaluation_result.py @@ -51,8 +51,8 @@ def main( { "workflow_permanent_id": workflow_pid, "status": str(workflow_run_response.status), - "summary": workflow_run_response.observer_task.summary, - "output": workflow_run_response.observer_task.output, + "summary": workflow_run_response.task_v2.summary, + "output": workflow_run_response.task_v2.output, "assertion": workflow_run_response.status == WorkflowRunStatus.completed, "failure_reason": workflow_run_response.failure_reason or "", } diff --git a/evaluation/script/eval_webvoyager_task_v2.py b/evaluation/script/eval_webvoyager_task_v2.py index 00bbd7fb..f9f67e33 100644 --- a/evaluation/script/eval_webvoyager_task_v2.py +++ b/evaluation/script/eval_webvoyager_task_v2.py @@ -36,8 +36,8 @@ async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> d one_record.update( { "status": str(workflow_run_response.status), - "summary": workflow_run_response.observer_task.summary, - "output": workflow_run_response.observer_task.output, + "summary": workflow_run_response.task_v2.summary, + "output": workflow_run_response.task_v2.output, } ) if workflow_run_response.status != WorkflowRunStatus.completed: diff --git a/skyvern/agent/local.py b/skyvern/agent/local.py index ea4eb919..7ff6598d 100644 --- a/skyvern/agent/local.py +++ b/skyvern/agent/local.py @@ -67,7 +67,7 @@ class Agent: ) async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None: - # mark observer cruise as queued + # mark task v2 as queued await app.DATABASE.update_task_v2( task_v2_id=task_v2.observer_cruise_id, status=ObserverTaskStatus.queued, @@ -80,7 +80,7 @@ class Agent: status=WorkflowRunStatus.queued, ) - await task_v2_service.run_observer_task( + await task_v2_service.run_task_v2( organization=organization, task_v2_id=task_v2.observer_cruise_id, ) @@ -156,7 +156,7 @@ class Agent: async def observer_task_v_2(self, task_request: ObserverTaskRequest) -> ObserverTask: organization = await self._get_organization() - observer_task = await task_v2_service.initialize_observer_task( + task_v2 = await task_v2_service.initialize_task_v2( organization=organization, user_prompt=task_request.user_prompt, user_url=str(task_request.url) if task_request.url else None, @@ -167,11 +167,11 @@ class Agent: publish_workflow=task_request.publish_workflow, ) - if not observer_task.workflow_run_id: - raise Exception("Observer cruise missing workflow run id") + if not task_v2.workflow_run_id: + raise Exception("Task v2 missing workflow run id") - asyncio.create_task(self._run_task_v2(organization, observer_task)) - return observer_task + asyncio.create_task(self._run_task_v2(organization, task_v2)) + return task_v2 async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None: organization = await self._get_organization() @@ -180,12 +180,12 @@ class Agent: async def run_observer_task_v_2( self, task_request: ObserverTaskRequest, timeout_seconds: int = 600 ) -> ObserverTask: - observer_task = await self.observer_task_v_2(task_request) + task_v2 = await self.observer_task_v_2(task_request) async with asyncio.timeout(timeout_seconds): while True: - refreshed_observer_task = await self.get_observer_task_v_2(observer_task.observer_cruise_id) - assert refreshed_observer_task is not None - if refreshed_observer_task.status.is_final(): - return refreshed_observer_task + refreshed_task_v2 = await self.get_observer_task_v_2(task_v2.observer_cruise_id) + assert refreshed_task_v2 is not None + if refreshed_task_v2.status.is_final(): + return refreshed_task_v2 await asyncio.sleep(1) diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index 1fe0dbfd..bee051ff 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -174,7 +174,7 @@ class BackgroundTaskExecutor(AsyncExecutor): if background_tasks: background_tasks.add_task( - task_v2_service.run_observer_task, + task_v2_service.run_task_v2, organization=organization, task_v2_id=task_v2_id, max_iterations_override=max_iterations_override, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 0bd50574..00d3a789 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -804,7 +804,7 @@ async def get_workflow_run( organization_id=current_org.organization_id, ) if task_v2: - return_dict["observer_task"] = task_v2.model_dump(by_alias=True) + return_dict["task_v2"] = task_v2.model_dump(by_alias=True) return return_dict @@ -1223,7 +1223,7 @@ async def upload_file( @v2_router.post("/tasks") @v2_router.post("/tasks/", include_in_schema=False) -async def observer_task( +async def create_task_v2( request: Request, background_tasks: BackgroundTasks, data: ObserverTaskRequest, @@ -1234,7 +1234,7 @@ async def observer_task( LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) try: - observer_task = await task_v2_service.initialize_observer_task( + task_v2 = await task_v2_service.initialize_task_v2( organization=organization, user_prompt=data.user_prompt, user_url=str(data.url) if data.url else None, @@ -1250,21 +1250,21 @@ async def observer_task( raise HTTPException( status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later." ) - analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_task.url}) + analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": task_v2.url}) await AsyncExecutorFactory.get_executor().execute_task_v2( request=request, background_tasks=background_tasks, organization_id=organization.organization_id, - task_v2_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, max_iterations_override=x_max_iterations_override, browser_session_id=data.browser_session_id, ) - return observer_task.model_dump(by_alias=True) + return task_v2.model_dump(by_alias=True) @v2_router.get("/tasks/{task_id}") @v2_router.get("/tasks/{task_id}/", include_in_schema=False) -async def get_observer_task( +async def get_task_v2( task_id: str, organization: Organization = Depends(org_auth_service.get_current_org), ) -> dict[str, Any]: diff --git a/skyvern/forge/sdk/services/task_v2_service.py b/skyvern/forge/sdk/services/task_v2_service.py index 2e00344f..656b559c 100644 --- a/skyvern/forge/sdk/services/task_v2_service.py +++ b/skyvern/forge/sdk/services/task_v2_service.py @@ -89,7 +89,7 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict: } -async def initialize_observer_task( +async def initialize_task_v2( organization: Organization, user_prompt: str, user_url: str | None = None, @@ -163,7 +163,7 @@ async def initialize_observer_task( except Exception: LOG.error("Failed to setup cruise workflow run", exc_info=True) # fail the workflow run - await mark_observer_task_as_failed( + await mark_task_v2_as_failed( task_v2_id=task_v2.observer_cruise_id, workflow_run_id=task_v2.workflow_run_id, failure_reason="Skyvern failed to setup the workflow run", @@ -206,7 +206,7 @@ async def initialize_observer_task( except Exception: LOG.warning("Failed to update task 2.0", exc_info=True) # fail the workflow run - await mark_observer_task_as_failed( + await mark_task_v2_as_failed( task_v2_id=task_v2.observer_cruise_id, workflow_run_id=workflow_run.workflow_run_id, failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run", @@ -217,7 +217,7 @@ async def initialize_observer_task( return task_v2 -async def run_observer_task( +async def run_task_v2( organization: Organization, task_v2_id: str, request_id: str | None = None, @@ -226,7 +226,7 @@ async def run_observer_task( ) -> ObserverTask: organization_id = organization.organization_id try: - observer_task = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) + task_v2 = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) except Exception: LOG.error( "Failed to get observer task", @@ -234,47 +234,47 @@ async def run_observer_task( organization_id=organization_id, exc_info=True, ) - return await mark_observer_task_as_failed( + return await mark_task_v2_as_failed( task_v2_id, organization_id=organization_id, failure_reason="Failed to get task v2", ) - if not observer_task: + if not task_v2: LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id) raise TaskV2NotFound(task_v2_id=task_v2_id) workflow, workflow_run = None, None try: - workflow, workflow_run, observer_task = await run_observer_task_helper( + workflow, workflow_run, task_v2 = await run_task_v2_helper( organization=organization, - observer_task=observer_task, + task_v2=task_v2, request_id=request_id, max_iterations_override=max_iterations_override, browser_session_id=browser_session_id, ) except TaskTerminationError as e: - observer_task = await mark_observer_task_as_terminated( + task_v2 = await mark_task_v2_as_terminated( task_v2_id=task_v2_id, - workflow_run_id=observer_task.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, organization_id=organization_id, failure_reason=e.message, ) LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message) - return observer_task + return task_v2 except OperationalError: LOG.error("Database error when running observer cruise", exc_info=True) - observer_task = await mark_observer_task_as_failed( + task_v2 = await mark_task_v2_as_failed( task_v2_id, - workflow_run_id=observer_task.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, failure_reason="Database error when running task 2.0", organization_id=organization_id, ) except Exception as e: LOG.error("Failed to run observer cruise", exc_info=True) failure_reason = f"Failed to run task 2.0: {str(e)}" - observer_task = await mark_observer_task_as_failed( + task_v2 = await mark_task_v2_as_failed( task_v2_id, - workflow_run_id=observer_task.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, failure_reason=failure_reason, organization_id=organization_id, ) @@ -291,40 +291,40 @@ async def run_observer_task( skyvern_context.reset() - return observer_task + return task_v2 -async def run_observer_task_helper( +async def run_task_v2_helper( organization: Organization, - observer_task: ObserverTask, + task_v2: ObserverTask, request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, ) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: organization_id = organization.organization_id - task_v2_id = observer_task.observer_cruise_id - if observer_task.status != ObserverTaskStatus.queued: + task_v2_id = task_v2.observer_cruise_id + if task_v2.status != ObserverTaskStatus.queued: LOG.error( "Task v2 is not queued. Duplicate task v2", task_v2_id=task_v2_id, - status=observer_task.status, + status=task_v2.status, organization_id=organization_id, ) - return None, None, observer_task - if not observer_task.url or not observer_task.prompt: + return None, None, task_v2 + if not task_v2.url or not task_v2.prompt: LOG.error( "Task v2 url or prompt not found", task_v2_id=task_v2_id, organization_id=organization_id, ) - return None, None, observer_task - if not observer_task.workflow_run_id: + return None, None, task_v2 + if not task_v2.workflow_run_id: LOG.error( "Workflow run id not found in task v2", task_v2_id=task_v2_id, organization_id=organization_id, ) - return None, None, observer_task + return None, None, task_v2 int_max_iterations_override = None if max_iterations_override: @@ -337,24 +337,24 @@ async def run_observer_task_helper( max_iterations_override=max_iterations_override, ) - workflow_run_id = observer_task.workflow_run_id + workflow_run_id = task_v2.workflow_run_id workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id) if not workflow_run: LOG.error("Workflow run not found", workflow_run_id=workflow_run_id) - return None, None, observer_task + return None, None, task_v2 else: LOG.info("Workflow run found", workflow_run_id=workflow_run_id) if workflow_run.status != WorkflowRunStatus.queued: LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status) - return None, None, observer_task + return None, None, task_v2 workflow_id = workflow_run.workflow_id workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id) if not workflow: LOG.error("Workflow not found", workflow_id=workflow_id) - return None, None, observer_task + return None, None, task_v2 ###################### run observer ###################### @@ -368,14 +368,14 @@ async def run_observer_task_helper( ) ) - observer_task = await app.DATABASE.update_task_v2( + task_v2 = await app.DATABASE.update_task_v2( task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await _set_up_workflow_context(workflow_id, workflow_run_id, organization) - url = str(observer_task.url) - user_prompt = observer_task.prompt + url = str(task_v2.url) + user_prompt = task_v2.prompt task_history: list[dict] = [] yaml_blocks: list[BLOCK_YAML_TYPES] = [] yaml_parameters: list[PARAMETER_YAML_TYPES] = [] @@ -401,12 +401,12 @@ async def run_observer_task_helper( workflow_run_id=workflow_run_id, task_v2_id=task_v2_id, ) - await mark_observer_task_as_canceled( + await mark_task_v2_as_canceled( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, organization_id=organization_id, ) - return workflow, workflow_run, observer_task + return workflow, workflow_run, task_v2 LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) task_type = "" @@ -500,8 +500,8 @@ async def run_observer_task_helper( iteration=i, workflow_run_id=workflow_run_id, ) - observer_task = await _summarize_observer_task( - observer_task=observer_task, + task_v2 = await _summarize_task_v2( + task_v2=task_v2, task_history=task_history, context=context, screenshots=scraped_page.screenshots, @@ -515,7 +515,7 @@ async def run_observer_task_helper( # parse observer repsonse and run the next task if not task_type: LOG.error("No task type found in observer response", observer_response=observer_response) - await mark_observer_task_as_failed( + await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason="Skyvern failed to generate a task. Please try again later.", @@ -524,7 +524,7 @@ async def run_observer_task_helper( if task_type == "extract": block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( - task_v2=observer_task, + task_v2=task_v2, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -543,14 +543,14 @@ async def run_observer_task_helper( workflow_run_id=workflow_run_id, original_url=original_url, navigation_goal=navigation_goal, - totp_verification_url=observer_task.totp_verification_url, - totp_identifier=observer_task.totp_identifier, + totp_verification_url=task_v2.totp_verification_url, + totp_identifier=task_v2.totp_identifier, ) task_history_record = {"type": task_type, "task": plan} elif task_type == "loop": try: block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( - task_v2=observer_task, + task_v2=task_v2, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -567,7 +567,7 @@ async def run_observer_task_helper( } except Exception: LOG.exception("Failed to generate loop task") - await mark_observer_task_as_failed( + await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason="Failed to generate the loop.", @@ -575,7 +575,7 @@ async def run_observer_task_helper( break else: LOG.info("Unsupported task type", task_type=task_type) - await mark_observer_task_as_failed( + await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, failure_reason=f"Unsupported task block type gets generated: {task_type}", @@ -612,7 +612,7 @@ async def run_observer_task_helper( workflow_create_request = WorkflowCreateYAMLRequest( title=workflow.title, description=workflow.description, - proxy_location=observer_task.proxy_location or ProxyLocation.RESIDENTIAL, + proxy_location=task_v2.proxy_location or ProxyLocation.RESIDENTIAL, workflow_definition=workflow_definition_yaml, status=workflow.status, ) @@ -702,8 +702,8 @@ async def run_observer_task_helper( workflow_run_id=workflow_run_id, completion_resp=completion_resp, ) - observer_task = await _summarize_observer_task( - observer_task=observer_task, + task_v2 = await _summarize_task_v2( + task_v2=task_v2, task_history=task_history, context=context, screenshots=completion_screenshots, @@ -715,7 +715,7 @@ async def run_observer_task_helper( max_iterations=max_iterations, workflow_run_id=workflow_run_id, ) - observer_task = await mark_observer_task_as_failed( + task_v2 = await mark_task_v2_as_failed( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, # TODO: add a better failure reason with LLM @@ -723,7 +723,7 @@ async def run_observer_task_helper( organization_id=organization_id, ) - return workflow, workflow_run, observer_task + return workflow, workflow_run, task_v2 async def handle_block_result( @@ -745,7 +745,7 @@ async def handle_block_result( block_type_var=block.block_type, block_label=block.label, ) - await mark_observer_task_as_canceled( + await mark_task_v2_as_canceled( task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, organization_id=workflow_run.organization_id, @@ -1200,13 +1200,13 @@ async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> Ob return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) -async def mark_observer_task_as_failed( +async def mark_task_v2_as_failed( task_v2_id: str, workflow_run_id: str | None = None, failure_reason: str | None = None, organization_id: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_task_v2( + task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.failed, @@ -1215,18 +1215,18 @@ async def mark_observer_task_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" ) - await send_observer_task_webhook(observer_task) - return observer_task + await send_task_v2_webhook(task_v2) + return task_v2 -async def mark_observer_task_as_completed( +async def mark_task_v2_as_completed( task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, summary: str | None = None, output: dict[str, Any] | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_task_v2( + task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.completed, @@ -1237,51 +1237,51 @@ async def mark_observer_task_as_completed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) # Track observer cruise duration when completed - duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds() + duration_seconds = (datetime.now(UTC) - task_v2.created_at.replace(tzinfo=UTC)).total_seconds() LOG.info( "Observer task duration metrics", task_v2_id=task_v2_id, workflow_run_id=workflow_run_id, duration_seconds=duration_seconds, - observer_task_status=ObserverTaskStatus.completed, + task_v2_status=ObserverTaskStatus.completed, organization_id=organization_id, ) - await send_observer_task_webhook(observer_task) - return observer_task + await send_task_v2_webhook(task_v2) + return task_v2 -async def mark_observer_task_as_canceled( +async def mark_task_v2_as_canceled( task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_task_v2( + task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.canceled, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) - await send_observer_task_webhook(observer_task) - return observer_task + await send_task_v2_webhook(task_v2) + return task_v2 -async def mark_observer_task_as_terminated( +async def mark_task_v2_as_terminated( task_v2_id: str, workflow_run_id: str | None = None, organization_id: str | None = None, failure_reason: str | None = None, ) -> ObserverTask: - observer_task = await app.DATABASE.update_task_v2( + task_v2 = await app.DATABASE.update_task_v2( task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.terminated, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason) - await send_observer_task_webhook(observer_task) - return observer_task + await send_task_v2_webhook(task_v2) + return task_v2 def _get_extracted_data_from_block_result( @@ -1353,25 +1353,25 @@ def _get_extracted_data_from_block_result( return None -async def _summarize_observer_task( - observer_task: ObserverTask, +async def _summarize_task_v2( + task_v2: ObserverTask, task_history: list[dict], context: SkyvernContext, screenshots: list[bytes] | None = None, ) -> ObserverTask: observer_thought = await app.DATABASE.create_observer_thought( - task_v2_id=observer_task.observer_cruise_id, - organization_id=observer_task.organization_id, - workflow_run_id=observer_task.workflow_run_id, - workflow_id=observer_task.workflow_id, - workflow_permanent_id=observer_task.workflow_permanent_id, + task_v2_id=task_v2.observer_cruise_id, + organization_id=task_v2.organization_id, + workflow_run_id=task_v2.workflow_run_id, + workflow_id=task_v2.workflow_id, + workflow_permanent_id=task_v2.workflow_permanent_id, observer_thought_type=ObserverThoughtType.user_goal_check, observer_thought_scenario=ObserverThoughtScenario.summarization, ) # summarize the observer cruise and format the output task_v2_summary_prompt = prompt_engine.load_prompt( "task_v2_summary", - user_goal=observer_task.prompt, + user_goal=task_v2.prompt, task_history=task_history, local_datetime=datetime.now(context.tz_info).isoformat(), ) @@ -1387,24 +1387,24 @@ async def _summarize_observer_task( summarized_output = task_v2_summary_resp.get("output") await app.DATABASE.update_observer_thought( observer_thought_id=observer_thought.observer_thought_id, - organization_id=observer_task.organization_id, + organization_id=task_v2.organization_id, thought=thought, output=task_v2_summary_resp, ) - return await mark_observer_task_as_completed( - task_v2_id=observer_task.observer_cruise_id, - workflow_run_id=observer_task.workflow_run_id, - organization_id=observer_task.organization_id, + return await mark_task_v2_as_completed( + task_v2_id=task_v2.observer_cruise_id, + workflow_run_id=task_v2.workflow_run_id, + organization_id=task_v2.organization_id, summary=thought, output=summarized_output, ) -async def send_observer_task_webhook(observer_task: ObserverTask) -> None: - if not observer_task.webhook_callback_url: +async def send_task_v2_webhook(task_v2: ObserverTask) -> None: + if not task_v2.webhook_callback_url: return - organization_id = observer_task.organization_id + organization_id = task_v2.organization_id if not organization_id: return api_key = await app.DATABASE.get_valid_org_auth_token( @@ -1414,37 +1414,37 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None: if not api_key: LOG.warning( "No valid API key found for the organization of observer cruise", - task_v2_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, ) return # build the observer cruise response - payload = observer_task.model_dump_json(by_alias=True) + payload = task_v2.model_dump_json(by_alias=True) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( "Sending observer cruise response to webhook callback url", - task_v2_id=observer_task.observer_cruise_id, - webhook_callback_url=observer_task.webhook_callback_url, + task_v2_id=task_v2.observer_cruise_id, + webhook_callback_url=task_v2.webhook_callback_url, payload=payload, headers=headers, ) try: resp = await httpx.AsyncClient().post( - observer_task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) ) if resp.status_code == 200: LOG.info( "Observer cruise webhook sent successfully", - task_v2_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, resp_code=resp.status_code, resp_text=resp.text, ) else: LOG.info( "Observer cruise webhook failed", - task_v2_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, resp=resp, resp_code=resp.status_code, resp_text=resp.text, ) except Exception as e: - raise FailedToSendWebhook(task_v2_id=observer_task.observer_cruise_id) from e + raise FailedToSendWebhook(task_v2_id=task_v2.observer_cruise_id) from e diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 3611b32b..e49a2496 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -2150,7 +2150,7 @@ class TaskV2Block(Block): workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id, organization_id) if not workflow_run: raise ValueError(f"WorkflowRun not found {workflow_run_id} when running TaskV2Block") - observer_task = await task_v2_service.initialize_observer_task( + task_v2 = await task_v2_service.initialize_task_v2( organization, user_prompt=self.prompt, user_url=self.url, @@ -2158,29 +2158,29 @@ class TaskV2Block(Block): proxy_location=workflow_run.proxy_location, ) await app.DATABASE.update_task_v2( - observer_task.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id + task_v2.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id ) - if observer_task.workflow_run_id: + if task_v2.workflow_run_id: await app.DATABASE.update_workflow_run( - workflow_run_id=observer_task.workflow_run_id, + workflow_run_id=task_v2.workflow_run_id, status=WorkflowRunStatus.queued, ) await app.DATABASE.update_workflow_run_block( workflow_run_block_id=workflow_run_block_id, organization_id=organization_id, - block_workflow_run_id=observer_task.workflow_run_id, + block_workflow_run_id=task_v2.workflow_run_id, ) - observer_task = await task_v2_service.run_observer_task( + task_v2 = await task_v2_service.run_task_v2( organization=organization, - task_v2_id=observer_task.observer_cruise_id, + task_v2_id=task_v2.observer_cruise_id, request_id=None, max_iterations_override=self.max_iterations, browser_session_id=browser_session_id, ) result_dict = None - if observer_task: - result_dict = observer_task.output + if task_v2: + result_dict = task_v2.output return await self.build_block_result( success=True, diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index daaae59c..ec4126b8 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -147,5 +147,5 @@ class WorkflowRunStatusResponse(BaseModel): outputs: dict[str, Any] | None = None total_steps: int | None = None total_cost: float | None = None - observer_task: ObserverTask | None = None + task_v2: ObserverTask | None = None workflow_title: str | None = None