diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index c072c897..389e949f 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -571,3 +571,8 @@ class InteractWithDropdownContainer(SkyvernException): class UrlGenerationFailure(SkyvernHTTPException): def __init__(self) -> None: super().__init__("Failed to generate the url for the prompt") + + +class ObserverCruiseNotFound(SkyvernHTTPException): + def __init__(self, observer_cruise_id: str) -> None: + super().__init__(f"Observer task {observer_cruise_id} not found") diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index c29c9fb9..dc87db1d 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -8,7 +8,7 @@ import httpx import structlog from sqlalchemy.exc import OperationalError -from skyvern.exceptions import FailedToSendWebhook, UrlGenerationFailure +from skyvern.exceptions import FailedToSendWebhook, ObserverCruiseNotFound, UrlGenerationFailure from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.artifact.models import ArtifactType @@ -199,10 +199,10 @@ async def run_observer_cruise( request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, -) -> None: +) -> ObserverTask: organization_id = organization.organization_id try: - observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + observer_task = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) except Exception: LOG.error( "Failed to get observer cruise", @@ -210,40 +210,37 @@ async def run_observer_cruise( organization_id=organization_id, exc_info=True, ) - await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id) - return None - if not observer_cruise: + return await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id) + if not observer_task: LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) - return None + raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id) workflow, workflow_run = None, None try: - workflow, workflow_run = await run_observer_cruise_helper( + workflow, workflow_run, observer_task = await run_observer_cruise_helper( organization=organization, - observer_cruise=observer_cruise, + observer_task=observer_task, request_id=request_id, max_iterations_override=max_iterations_override, browser_session_id=browser_session_id, ) except OperationalError: LOG.error("Database error when running observer cruise", exc_info=True) - await mark_observer_cruise_as_failed( + observer_task = await mark_observer_cruise_as_failed( observer_cruise_id, - workflow_run_id=observer_cruise.workflow_run_id, + workflow_run_id=observer_task.workflow_run_id, failure_reason="Database error when running task 2.0", organization_id=organization_id, ) - return except Exception as e: LOG.error("Failed to run observer cruise", exc_info=True) failure_reason = f"Failed to run task 2.0: {str(e)}" - await mark_observer_cruise_as_failed( + observer_task = await mark_observer_cruise_as_failed( observer_cruise_id, - workflow_run_id=observer_cruise.workflow_run_id, + workflow_run_id=observer_task.workflow_run_id, failure_reason=failure_reason, organization_id=organization_id, ) - return finally: if workflow and workflow_run: await app.WORKFLOW_SERVICE.clean_up_workflow( @@ -257,38 +254,40 @@ async def run_observer_cruise( skyvern_context.reset() + return observer_task + async def run_observer_cruise_helper( organization: Organization, - observer_cruise: ObserverTask, + observer_task: ObserverTask, request_id: str | None = None, max_iterations_override: str | int | None = None, browser_session_id: str | None = None, -) -> tuple[Workflow, WorkflowRun] | tuple[None, None]: +) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: organization_id = organization.organization_id - observer_cruise_id = observer_cruise.observer_cruise_id - if observer_cruise.status != ObserverTaskStatus.queued: + observer_cruise_id = observer_task.observer_cruise_id + if observer_task.status != ObserverTaskStatus.queued: LOG.error( "Observer cruise is not queued. Duplicate observer cruise", observer_cruise_id=observer_cruise_id, - status=observer_cruise.status, + status=observer_task.status, organization_id=organization_id, ) - return None, None - if not observer_cruise.url or not observer_cruise.prompt: + return None, None, observer_task + if not observer_task.url or not observer_task.prompt: LOG.error( "Observer cruise url or prompt not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id, ) - return None, None - if not observer_cruise.workflow_run_id: + return None, None, observer_task + if not observer_task.workflow_run_id: LOG.error( "Workflow run id not found in observer cruise", observer_cruise_id=observer_cruise_id, organization_id=organization_id, ) - return None, None + return None, None, observer_task int_max_iterations_override = None if max_iterations_override: @@ -301,24 +300,24 @@ async def run_observer_cruise_helper( max_iterations_override=max_iterations_override, ) - workflow_run_id = observer_cruise.workflow_run_id + workflow_run_id = observer_task.workflow_run_id workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id) if not workflow_run: LOG.error("Workflow run not found", workflow_run_id=workflow_run_id) - return None, None + return None, None, observer_task else: LOG.info("Workflow run found", workflow_run_id=workflow_run_id) if workflow_run.status != WorkflowRunStatus.queued: LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status) - return None, None + return None, None, observer_task workflow_id = workflow_run.workflow_id workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id) if not workflow: LOG.error("Workflow not found", workflow_id=workflow_id) - return None, None + return None, None, observer_task ###################### run observer ###################### @@ -332,14 +331,14 @@ async def run_observer_cruise_helper( ) ) - await app.DATABASE.update_observer_cruise( + observer_task = await app.DATABASE.update_observer_cruise( observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.running ) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await _set_up_workflow_context(workflow_id, workflow_run_id) - url = str(observer_cruise.url) - user_prompt = observer_cruise.prompt + url = str(observer_task.url) + user_prompt = observer_task.prompt task_history: list[dict] = [] yaml_blocks: list[BLOCK_YAML_TYPES] = [] yaml_parameters: list[PARAMETER_YAML_TYPES] = [] @@ -420,8 +419,8 @@ async def run_observer_cruise_helper( iteration=i, workflow_run_id=workflow_run_id, ) - await _summarize_observer_cruise( - observer_cruise=observer_cruise, + observer_task = await _summarize_observer_cruise( + observer_task=observer_task, task_history=task_history, context=context, screenshots=scraped_page.screenshots, @@ -445,7 +444,7 @@ async def run_observer_cruise_helper( task_history_record: dict[str, Any] = {} if task_type == "extract": block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( - observer_cruise=observer_cruise, + observer_cruise=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -464,14 +463,14 @@ async def run_observer_cruise_helper( workflow_run_id=workflow_run_id, original_url=original_url, navigation_goal=navigation_goal, - totp_verification_url=observer_cruise.totp_verification_url, - totp_identifier=observer_cruise.totp_identifier, + totp_verification_url=observer_task.totp_verification_url, + totp_identifier=observer_task.totp_identifier, ) task_history_record = {"type": task_type, "task": plan} elif task_type == "loop": try: block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( - observer_cruise=observer_cruise, + observer_cruise=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, @@ -528,7 +527,7 @@ async def run_observer_cruise_helper( workflow_create_request = WorkflowCreateYAMLRequest( title=workflow.title, description=workflow.description, - proxy_location=observer_cruise.proxy_location or ProxyLocation.RESIDENTIAL, + proxy_location=observer_task.proxy_location or ProxyLocation.RESIDENTIAL, workflow_definition=workflow_definition_yaml, status=workflow.status, ) @@ -611,8 +610,8 @@ async def run_observer_cruise_helper( workflow_run_id=workflow_run_id, completion_resp=completion_resp, ) - await _summarize_observer_cruise( - observer_cruise=observer_cruise, + observer_task = await _summarize_observer_cruise( + observer_task=observer_task, task_history=task_history, context=context, screenshots=completion_screenshots, @@ -624,7 +623,7 @@ async def run_observer_cruise_helper( max_iterations=max_iterations, workflow_run_id=workflow_run_id, ) - await mark_observer_cruise_as_failed( + observer_task = await mark_observer_cruise_as_failed( observer_cruise_id=observer_cruise_id, workflow_run_id=workflow_run_id, # TODO: add a better failure reason with LLM @@ -632,7 +631,7 @@ async def run_observer_cruise_helper( organization_id=organization_id, ) - return workflow, workflow_run + return workflow, workflow_run, observer_task async def handle_block_result( @@ -1093,17 +1092,18 @@ async def mark_observer_cruise_as_failed( workflow_run_id: str | None = None, failure_reason: str | None = None, organization_id: str | None = None, -) -> None: - await app.DATABASE.update_observer_cruise( - observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.failed +) -> ObserverTask: + observer_task = await app.DATABASE.update_observer_cruise( + observer_cruise_id, + organization_id=organization_id, + status=ObserverTaskStatus.failed, ) if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" ) - observer_cruise = await get_observer_cruise(observer_cruise_id, organization_id=organization_id) - if observer_cruise: - await send_observer_cruise_webhook(observer_cruise) + await send_observer_cruise_webhook(observer_task) + return observer_task async def mark_observer_cruise_as_completed( @@ -1112,8 +1112,8 @@ async def mark_observer_cruise_as_completed( organization_id: str | None = None, summary: str | None = None, output: dict[str, Any] | None = None, -) -> None: - await app.DATABASE.update_observer_cruise( +) -> ObserverTask: + observer_task = await app.DATABASE.update_observer_cruise( observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.completed, @@ -1123,9 +1123,8 @@ async def mark_observer_cruise_as_completed( if workflow_run_id: await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) - observer_cruise = await get_observer_cruise(observer_cruise_id, organization_id=organization_id) - if observer_cruise: - await send_observer_cruise_webhook(observer_cruise) + await send_observer_cruise_webhook(observer_task) + return observer_task def _get_extracted_data_from_block_result( @@ -1198,24 +1197,24 @@ def _get_extracted_data_from_block_result( async def _summarize_observer_cruise( - observer_cruise: ObserverTask, + observer_task: ObserverTask, task_history: list[dict], context: SkyvernContext, screenshots: list[bytes] | None = None, -) -> None: +) -> ObserverTask: observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise.observer_cruise_id, - organization_id=observer_cruise.organization_id, - workflow_run_id=observer_cruise.workflow_run_id, - workflow_id=observer_cruise.workflow_id, - workflow_permanent_id=observer_cruise.workflow_permanent_id, + observer_cruise_id=observer_task.observer_cruise_id, + organization_id=observer_task.organization_id, + workflow_run_id=observer_task.workflow_run_id, + workflow_id=observer_task.workflow_id, + workflow_permanent_id=observer_task.workflow_permanent_id, observer_thought_type=ObserverThoughtType.user_goal_check, observer_thought_scenario=ObserverThoughtScenario.summarization, ) # summarize the observer cruise and format the output observer_summary_prompt = prompt_engine.load_prompt( "observer_summary", - user_goal=observer_cruise.prompt, + user_goal=observer_task.prompt, task_history=task_history, local_datetime=datetime.now(context.tz_info).isoformat(), ) @@ -1230,24 +1229,24 @@ async def _summarize_observer_cruise( summarized_output = observer_summary_resp.get("output") await app.DATABASE.update_observer_thought( observer_thought_id=observer_thought.observer_thought_id, - organization_id=observer_cruise.organization_id, + organization_id=observer_task.organization_id, thought=thought, output=observer_summary_resp, ) - await mark_observer_cruise_as_completed( - observer_cruise_id=observer_cruise.observer_cruise_id, - workflow_run_id=observer_cruise.workflow_run_id, - organization_id=observer_cruise.organization_id, + return await mark_observer_cruise_as_completed( + observer_cruise_id=observer_task.observer_cruise_id, + workflow_run_id=observer_task.workflow_run_id, + organization_id=observer_task.organization_id, summary=thought, output=summarized_output, ) -async def send_observer_cruise_webhook(observer_cruise: ObserverTask) -> None: - if not observer_cruise.webhook_callback_url: +async def send_observer_cruise_webhook(observer_task: ObserverTask) -> None: + if not observer_task.webhook_callback_url: return - organization_id = observer_cruise.organization_id + organization_id = observer_task.organization_id if not organization_id: return api_key = await app.DATABASE.get_valid_org_auth_token( @@ -1257,37 +1256,37 @@ async def send_observer_cruise_webhook(observer_cruise: ObserverTask) -> None: if not api_key: LOG.warning( "No valid API key found for the organization of observer cruise", - observer_cruise_id=observer_cruise.observer_cruise_id, + observer_cruise_id=observer_task.observer_cruise_id, ) return # build the observer cruise response - payload = observer_cruise.model_dump_json(by_alias=True) + payload = observer_task.model_dump_json(by_alias=True) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) LOG.info( "Sending observer cruise response to webhook callback url", - observer_cruise_id=observer_cruise.observer_cruise_id, - webhook_callback_url=observer_cruise.webhook_callback_url, + observer_cruise_id=observer_task.observer_cruise_id, + webhook_callback_url=observer_task.webhook_callback_url, payload=payload, headers=headers, ) try: resp = await httpx.AsyncClient().post( - observer_cruise.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) + observer_task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) ) if resp.status_code == 200: LOG.info( "Observer cruise webhook sent successfully", - observer_cruise_id=observer_cruise.observer_cruise_id, + observer_cruise_id=observer_task.observer_cruise_id, resp_code=resp.status_code, resp_text=resp.text, ) else: LOG.info( "Observer cruise webhook failed", - observer_cruise_id=observer_cruise.observer_cruise_id, + observer_cruise_id=observer_task.observer_cruise_id, resp=resp, resp_code=resp.status_code, resp_text=resp.text, ) except Exception as e: - raise FailedToSendWebhook(observer_cruise_id=observer_cruise.observer_cruise_id) from e + raise FailedToSendWebhook(observer_cruise_id=observer_task.observer_cruise_id) from e