get rid of observer_cruise in observer_service code (#1651)

This commit is contained in:
Shuchang Zheng
2025-01-27 12:53:23 +08:00
committed by GitHub
parent 4b2ea40947
commit 6215085a4e
2 changed files with 82 additions and 78 deletions

View File

@@ -571,3 +571,8 @@ class InteractWithDropdownContainer(SkyvernException):
class UrlGenerationFailure(SkyvernHTTPException):
def __init__(self) -> None:
super().__init__("Failed to generate the url for the prompt")
class ObserverCruiseNotFound(SkyvernHTTPException):
def __init__(self, observer_cruise_id: str) -> None:
super().__init__(f"Observer task {observer_cruise_id} not found")

View File

@@ -8,7 +8,7 @@ import httpx
import structlog
from sqlalchemy.exc import OperationalError
from skyvern.exceptions import FailedToSendWebhook, UrlGenerationFailure
from skyvern.exceptions import FailedToSendWebhook, ObserverCruiseNotFound, UrlGenerationFailure
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.artifact.models import ArtifactType
@@ -199,10 +199,10 @@ async def run_observer_cruise(
request_id: str | None = None,
max_iterations_override: str | int | None = None,
browser_session_id: str | None = None,
) -> None:
) -> ObserverTask:
organization_id = organization.organization_id
try:
observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
observer_task = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
except Exception:
LOG.error(
"Failed to get observer cruise",
@@ -210,40 +210,37 @@ async def run_observer_cruise(
organization_id=organization_id,
exc_info=True,
)
await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id)
return None
if not observer_cruise:
return await mark_observer_cruise_as_failed(observer_cruise_id, organization_id=organization_id)
if not observer_task:
LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id)
return None
raise ObserverCruiseNotFound(observer_cruise_id=observer_cruise_id)
workflow, workflow_run = None, None
try:
workflow, workflow_run = await run_observer_cruise_helper(
workflow, workflow_run, observer_task = await run_observer_cruise_helper(
organization=organization,
observer_cruise=observer_cruise,
observer_task=observer_task,
request_id=request_id,
max_iterations_override=max_iterations_override,
browser_session_id=browser_session_id,
)
except OperationalError:
LOG.error("Database error when running observer cruise", exc_info=True)
await mark_observer_cruise_as_failed(
observer_task = await mark_observer_cruise_as_failed(
observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
workflow_run_id=observer_task.workflow_run_id,
failure_reason="Database error when running task 2.0",
organization_id=organization_id,
)
return
except Exception as e:
LOG.error("Failed to run observer cruise", exc_info=True)
failure_reason = f"Failed to run task 2.0: {str(e)}"
await mark_observer_cruise_as_failed(
observer_task = await mark_observer_cruise_as_failed(
observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
workflow_run_id=observer_task.workflow_run_id,
failure_reason=failure_reason,
organization_id=organization_id,
)
return
finally:
if workflow and workflow_run:
await app.WORKFLOW_SERVICE.clean_up_workflow(
@@ -257,38 +254,40 @@ async def run_observer_cruise(
skyvern_context.reset()
return observer_task
async def run_observer_cruise_helper(
organization: Organization,
observer_cruise: ObserverTask,
observer_task: ObserverTask,
request_id: str | None = None,
max_iterations_override: str | int | None = None,
browser_session_id: str | None = None,
) -> tuple[Workflow, WorkflowRun] | tuple[None, None]:
) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]:
organization_id = organization.organization_id
observer_cruise_id = observer_cruise.observer_cruise_id
if observer_cruise.status != ObserverTaskStatus.queued:
observer_cruise_id = observer_task.observer_cruise_id
if observer_task.status != ObserverTaskStatus.queued:
LOG.error(
"Observer cruise is not queued. Duplicate observer cruise",
observer_cruise_id=observer_cruise_id,
status=observer_cruise.status,
status=observer_task.status,
organization_id=organization_id,
)
return None, None
if not observer_cruise.url or not observer_cruise.prompt:
return None, None, observer_task
if not observer_task.url or not observer_task.prompt:
LOG.error(
"Observer cruise url or prompt not found",
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
)
return None, None
if not observer_cruise.workflow_run_id:
return None, None, observer_task
if not observer_task.workflow_run_id:
LOG.error(
"Workflow run id not found in observer cruise",
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
)
return None, None
return None, None, observer_task
int_max_iterations_override = None
if max_iterations_override:
@@ -301,24 +300,24 @@ async def run_observer_cruise_helper(
max_iterations_override=max_iterations_override,
)
workflow_run_id = observer_cruise.workflow_run_id
workflow_run_id = observer_task.workflow_run_id
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id)
if not workflow_run:
LOG.error("Workflow run not found", workflow_run_id=workflow_run_id)
return None, None
return None, None, observer_task
else:
LOG.info("Workflow run found", workflow_run_id=workflow_run_id)
if workflow_run.status != WorkflowRunStatus.queued:
LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status)
return None, None
return None, None, observer_task
workflow_id = workflow_run.workflow_id
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id)
if not workflow:
LOG.error("Workflow not found", workflow_id=workflow_id)
return None, None
return None, None, observer_task
###################### run observer ######################
@@ -332,14 +331,14 @@ async def run_observer_cruise_helper(
)
)
await app.DATABASE.update_observer_cruise(
observer_task = await app.DATABASE.update_observer_cruise(
observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.running
)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
await _set_up_workflow_context(workflow_id, workflow_run_id)
url = str(observer_cruise.url)
user_prompt = observer_cruise.prompt
url = str(observer_task.url)
user_prompt = observer_task.prompt
task_history: list[dict] = []
yaml_blocks: list[BLOCK_YAML_TYPES] = []
yaml_parameters: list[PARAMETER_YAML_TYPES] = []
@@ -420,8 +419,8 @@ async def run_observer_cruise_helper(
iteration=i,
workflow_run_id=workflow_run_id,
)
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
observer_task = await _summarize_observer_cruise(
observer_task=observer_task,
task_history=task_history,
context=context,
screenshots=scraped_page.screenshots,
@@ -445,7 +444,7 @@ async def run_observer_cruise_helper(
task_history_record: dict[str, Any] = {}
if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
observer_cruise=observer_cruise,
observer_cruise=observer_task,
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
@@ -464,14 +463,14 @@ async def run_observer_cruise_helper(
workflow_run_id=workflow_run_id,
original_url=original_url,
navigation_goal=navigation_goal,
totp_verification_url=observer_cruise.totp_verification_url,
totp_identifier=observer_cruise.totp_identifier,
totp_verification_url=observer_task.totp_verification_url,
totp_identifier=observer_task.totp_identifier,
)
task_history_record = {"type": task_type, "task": plan}
elif task_type == "loop":
try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
observer_cruise=observer_cruise,
observer_cruise=observer_task,
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
@@ -528,7 +527,7 @@ async def run_observer_cruise_helper(
workflow_create_request = WorkflowCreateYAMLRequest(
title=workflow.title,
description=workflow.description,
proxy_location=observer_cruise.proxy_location or ProxyLocation.RESIDENTIAL,
proxy_location=observer_task.proxy_location or ProxyLocation.RESIDENTIAL,
workflow_definition=workflow_definition_yaml,
status=workflow.status,
)
@@ -611,8 +610,8 @@ async def run_observer_cruise_helper(
workflow_run_id=workflow_run_id,
completion_resp=completion_resp,
)
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
observer_task = await _summarize_observer_cruise(
observer_task=observer_task,
task_history=task_history,
context=context,
screenshots=completion_screenshots,
@@ -624,7 +623,7 @@ async def run_observer_cruise_helper(
max_iterations=max_iterations,
workflow_run_id=workflow_run_id,
)
await mark_observer_cruise_as_failed(
observer_task = await mark_observer_cruise_as_failed(
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
# TODO: add a better failure reason with LLM
@@ -632,7 +631,7 @@ async def run_observer_cruise_helper(
organization_id=organization_id,
)
return workflow, workflow_run
return workflow, workflow_run, observer_task
async def handle_block_result(
@@ -1093,17 +1092,18 @@ async def mark_observer_cruise_as_failed(
workflow_run_id: str | None = None,
failure_reason: str | None = None,
organization_id: str | None = None,
) -> None:
await app.DATABASE.update_observer_cruise(
observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.failed
) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise(
observer_cruise_id,
organization_id=organization_id,
status=ObserverTaskStatus.failed,
)
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
)
observer_cruise = await get_observer_cruise(observer_cruise_id, organization_id=organization_id)
if observer_cruise:
await send_observer_cruise_webhook(observer_cruise)
await send_observer_cruise_webhook(observer_task)
return observer_task
async def mark_observer_cruise_as_completed(
@@ -1112,8 +1112,8 @@ async def mark_observer_cruise_as_completed(
organization_id: str | None = None,
summary: str | None = None,
output: dict[str, Any] | None = None,
) -> None:
await app.DATABASE.update_observer_cruise(
) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise(
observer_cruise_id,
organization_id=organization_id,
status=ObserverTaskStatus.completed,
@@ -1123,9 +1123,8 @@ async def mark_observer_cruise_as_completed(
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
observer_cruise = await get_observer_cruise(observer_cruise_id, organization_id=organization_id)
if observer_cruise:
await send_observer_cruise_webhook(observer_cruise)
await send_observer_cruise_webhook(observer_task)
return observer_task
def _get_extracted_data_from_block_result(
@@ -1198,24 +1197,24 @@ def _get_extracted_data_from_block_result(
async def _summarize_observer_cruise(
observer_cruise: ObserverTask,
observer_task: ObserverTask,
task_history: list[dict],
context: SkyvernContext,
screenshots: list[bytes] | None = None,
) -> None:
) -> ObserverTask:
observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id,
organization_id=observer_cruise.organization_id,
workflow_run_id=observer_cruise.workflow_run_id,
workflow_id=observer_cruise.workflow_id,
workflow_permanent_id=observer_cruise.workflow_permanent_id,
observer_cruise_id=observer_task.observer_cruise_id,
organization_id=observer_task.organization_id,
workflow_run_id=observer_task.workflow_run_id,
workflow_id=observer_task.workflow_id,
workflow_permanent_id=observer_task.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.user_goal_check,
observer_thought_scenario=ObserverThoughtScenario.summarization,
)
# summarize the observer cruise and format the output
observer_summary_prompt = prompt_engine.load_prompt(
"observer_summary",
user_goal=observer_cruise.prompt,
user_goal=observer_task.prompt,
task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
@@ -1230,24 +1229,24 @@ async def _summarize_observer_cruise(
summarized_output = observer_summary_resp.get("output")
await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id,
organization_id=observer_cruise.organization_id,
organization_id=observer_task.organization_id,
thought=thought,
output=observer_summary_resp,
)
await mark_observer_cruise_as_completed(
observer_cruise_id=observer_cruise.observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
organization_id=observer_cruise.organization_id,
return await mark_observer_cruise_as_completed(
observer_cruise_id=observer_task.observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id,
organization_id=observer_task.organization_id,
summary=thought,
output=summarized_output,
)
async def send_observer_cruise_webhook(observer_cruise: ObserverTask) -> None:
if not observer_cruise.webhook_callback_url:
async def send_observer_cruise_webhook(observer_task: ObserverTask) -> None:
if not observer_task.webhook_callback_url:
return
organization_id = observer_cruise.organization_id
organization_id = observer_task.organization_id
if not organization_id:
return
api_key = await app.DATABASE.get_valid_org_auth_token(
@@ -1257,37 +1256,37 @@ async def send_observer_cruise_webhook(observer_cruise: ObserverTask) -> None:
if not api_key:
LOG.warning(
"No valid API key found for the organization of observer cruise",
observer_cruise_id=observer_cruise.observer_cruise_id,
observer_cruise_id=observer_task.observer_cruise_id,
)
return
# build the observer cruise response
payload = observer_cruise.model_dump_json(by_alias=True)
payload = observer_task.model_dump_json(by_alias=True)
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token)
LOG.info(
"Sending observer cruise response to webhook callback url",
observer_cruise_id=observer_cruise.observer_cruise_id,
webhook_callback_url=observer_cruise.webhook_callback_url,
observer_cruise_id=observer_task.observer_cruise_id,
webhook_callback_url=observer_task.webhook_callback_url,
payload=payload,
headers=headers,
)
try:
resp = await httpx.AsyncClient().post(
observer_cruise.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)
observer_task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)
)
if resp.status_code == 200:
LOG.info(
"Observer cruise webhook sent successfully",
observer_cruise_id=observer_cruise.observer_cruise_id,
observer_cruise_id=observer_task.observer_cruise_id,
resp_code=resp.status_code,
resp_text=resp.text,
)
else:
LOG.info(
"Observer cruise webhook failed",
observer_cruise_id=observer_cruise.observer_cruise_id,
observer_cruise_id=observer_task.observer_cruise_id,
resp=resp,
resp_code=resp.status_code,
resp_text=resp.text,
)
except Exception as e:
raise FailedToSendWebhook(observer_cruise_id=observer_cruise.observer_cruise_id) from e
raise FailedToSendWebhook(observer_cruise_id=observer_task.observer_cruise_id) from e