task v2 refactor part 8: observer_task -> task_v2 in backend code (#1820)

This commit is contained in:
Shuchang Zheng
2025-02-23 22:17:28 -08:00
committed by GitHub
parent 148693aa25
commit b1de14e2fe
9 changed files with 132 additions and 132 deletions

View File

@@ -260,7 +260,7 @@ class Evaluator:
) )
extracted_information: list | dict[str, Any] | str | None = None extracted_information: list | dict[str, Any] | str | None = None
if workflow_run_response.observer_task is None: if workflow_run_response.task_v2 is None:
assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, ( assert workflow_run_response.outputs and len(workflow_run_response.outputs) > 0, (
f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output" f"Expected {workflow_pid + '/' + workflow_run_id} with output, but got empty output"
) )
@@ -272,10 +272,10 @@ class Evaluator:
# FIXME: improve this when the last block is loop block # FIXME: improve this when the last block is loop block
extracted_information = result extracted_information = result
else: else:
workflow_run_response.observer_task.summary workflow_run_response.task_v2.summary
workflow_run_response.observer_task.output workflow_run_response.task_v2.output
summary = f"{('summary:' + workflow_run_response.observer_task.summary) if workflow_run_response.observer_task.summary else ''}" summary = f"{('summary:' + workflow_run_response.task_v2.summary) if workflow_run_response.task_v2.summary else ''}"
output = f"{('output: ' + json.dumps(workflow_run_response.observer_task.output)) if workflow_run_response.observer_task.output else ''}" output = f"{('output: ' + json.dumps(workflow_run_response.task_v2.output)) if workflow_run_response.task_v2.output else ''}"
extracted_information = "" extracted_information = ""
if summary: if summary:
extracted_information = summary extracted_information = summary

View File

@@ -51,8 +51,8 @@ def main(
{ {
"workflow_permanent_id": workflow_pid, "workflow_permanent_id": workflow_pid,
"status": str(workflow_run_response.status), "status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_task.summary, "summary": workflow_run_response.task_v2.summary,
"output": workflow_run_response.observer_task.output, "output": workflow_run_response.task_v2.output,
"assertion": workflow_run_response.status == WorkflowRunStatus.completed, "assertion": workflow_run_response.status == WorkflowRunStatus.completed,
"failure_reason": workflow_run_response.failure_reason or "", "failure_reason": workflow_run_response.failure_reason or "",
} }

View File

@@ -36,8 +36,8 @@ async def process_record(client: SkyvernClient, one_record: dict[str, Any]) -> d
one_record.update( one_record.update(
{ {
"status": str(workflow_run_response.status), "status": str(workflow_run_response.status),
"summary": workflow_run_response.observer_task.summary, "summary": workflow_run_response.task_v2.summary,
"output": workflow_run_response.observer_task.output, "output": workflow_run_response.task_v2.output,
} }
) )
if workflow_run_response.status != WorkflowRunStatus.completed: if workflow_run_response.status != WorkflowRunStatus.completed:

View File

@@ -67,7 +67,7 @@ class Agent:
) )
async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None: async def _run_task_v2(self, organization: Organization, task_v2: ObserverTask) -> None:
# mark observer cruise as queued # mark task v2 as queued
await app.DATABASE.update_task_v2( await app.DATABASE.update_task_v2(
task_v2_id=task_v2.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
status=ObserverTaskStatus.queued, status=ObserverTaskStatus.queued,
@@ -80,7 +80,7 @@ class Agent:
status=WorkflowRunStatus.queued, status=WorkflowRunStatus.queued,
) )
await task_v2_service.run_observer_task( await task_v2_service.run_task_v2(
organization=organization, organization=organization,
task_v2_id=task_v2.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
) )
@@ -156,7 +156,7 @@ class Agent:
async def observer_task_v_2(self, task_request: ObserverTaskRequest) -> ObserverTask: async def observer_task_v_2(self, task_request: ObserverTaskRequest) -> ObserverTask:
organization = await self._get_organization() organization = await self._get_organization()
observer_task = await task_v2_service.initialize_observer_task( task_v2 = await task_v2_service.initialize_task_v2(
organization=organization, organization=organization,
user_prompt=task_request.user_prompt, user_prompt=task_request.user_prompt,
user_url=str(task_request.url) if task_request.url else None, user_url=str(task_request.url) if task_request.url else None,
@@ -167,11 +167,11 @@ class Agent:
publish_workflow=task_request.publish_workflow, publish_workflow=task_request.publish_workflow,
) )
if not observer_task.workflow_run_id: if not task_v2.workflow_run_id:
raise Exception("Observer cruise missing workflow run id") raise Exception("Task v2 missing workflow run id")
asyncio.create_task(self._run_task_v2(organization, observer_task)) asyncio.create_task(self._run_task_v2(organization, task_v2))
return observer_task return task_v2
async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None: async def get_observer_task_v_2(self, task_id: str) -> ObserverTask | None:
organization = await self._get_organization() organization = await self._get_organization()
@@ -180,12 +180,12 @@ class Agent:
async def run_observer_task_v_2( async def run_observer_task_v_2(
self, task_request: ObserverTaskRequest, timeout_seconds: int = 600 self, task_request: ObserverTaskRequest, timeout_seconds: int = 600
) -> ObserverTask: ) -> ObserverTask:
observer_task = await self.observer_task_v_2(task_request) task_v2 = await self.observer_task_v_2(task_request)
async with asyncio.timeout(timeout_seconds): async with asyncio.timeout(timeout_seconds):
while True: while True:
refreshed_observer_task = await self.get_observer_task_v_2(observer_task.observer_cruise_id) refreshed_task_v2 = await self.get_observer_task_v_2(task_v2.observer_cruise_id)
assert refreshed_observer_task is not None assert refreshed_task_v2 is not None
if refreshed_observer_task.status.is_final(): if refreshed_task_v2.status.is_final():
return refreshed_observer_task return refreshed_task_v2
await asyncio.sleep(1) await asyncio.sleep(1)

View File

@@ -174,7 +174,7 @@ class BackgroundTaskExecutor(AsyncExecutor):
if background_tasks: if background_tasks:
background_tasks.add_task( background_tasks.add_task(
task_v2_service.run_observer_task, task_v2_service.run_task_v2,
organization=organization, organization=organization,
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
max_iterations_override=max_iterations_override, max_iterations_override=max_iterations_override,

View File

@@ -804,7 +804,7 @@ async def get_workflow_run(
organization_id=current_org.organization_id, organization_id=current_org.organization_id,
) )
if task_v2: if task_v2:
return_dict["observer_task"] = task_v2.model_dump(by_alias=True) return_dict["task_v2"] = task_v2.model_dump(by_alias=True)
return return_dict return return_dict
@@ -1223,7 +1223,7 @@ async def upload_file(
@v2_router.post("/tasks") @v2_router.post("/tasks")
@v2_router.post("/tasks/", include_in_schema=False) @v2_router.post("/tasks/", include_in_schema=False)
async def observer_task( async def create_task_v2(
request: Request, request: Request,
background_tasks: BackgroundTasks, background_tasks: BackgroundTasks,
data: ObserverTaskRequest, data: ObserverTaskRequest,
@@ -1234,7 +1234,7 @@ async def observer_task(
LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override)
try: try:
observer_task = await task_v2_service.initialize_observer_task( task_v2 = await task_v2_service.initialize_task_v2(
organization=organization, organization=organization,
user_prompt=data.user_prompt, user_prompt=data.user_prompt,
user_url=str(data.url) if data.url else None, user_url=str(data.url) if data.url else None,
@@ -1250,21 +1250,21 @@ async def observer_task(
raise HTTPException( raise HTTPException(
status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later." status_code=500, detail="Skyvern LLM failure to initialize observer cruise. Please try again later."
) )
analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_task.url}) analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": task_v2.url})
await AsyncExecutorFactory.get_executor().execute_task_v2( await AsyncExecutorFactory.get_executor().execute_task_v2(
request=request, request=request,
background_tasks=background_tasks, background_tasks=background_tasks,
organization_id=organization.organization_id, organization_id=organization.organization_id,
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
max_iterations_override=x_max_iterations_override, max_iterations_override=x_max_iterations_override,
browser_session_id=data.browser_session_id, browser_session_id=data.browser_session_id,
) )
return observer_task.model_dump(by_alias=True) return task_v2.model_dump(by_alias=True)
@v2_router.get("/tasks/{task_id}") @v2_router.get("/tasks/{task_id}")
@v2_router.get("/tasks/{task_id}/", include_in_schema=False) @v2_router.get("/tasks/{task_id}/", include_in_schema=False)
async def get_observer_task( async def get_task_v2(
task_id: str, task_id: str,
organization: Organization = Depends(org_auth_service.get_current_org), organization: Organization = Depends(org_auth_service.get_current_org),
) -> dict[str, Any]: ) -> dict[str, Any]:

View File

@@ -89,7 +89,7 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict:
} }
async def initialize_observer_task( async def initialize_task_v2(
organization: Organization, organization: Organization,
user_prompt: str, user_prompt: str,
user_url: str | None = None, user_url: str | None = None,
@@ -163,7 +163,7 @@ async def initialize_observer_task(
except Exception: except Exception:
LOG.error("Failed to setup cruise workflow run", exc_info=True) LOG.error("Failed to setup cruise workflow run", exc_info=True)
# fail the workflow run # fail the workflow run
await mark_observer_task_as_failed( await mark_task_v2_as_failed(
task_v2_id=task_v2.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=task_v2.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
failure_reason="Skyvern failed to setup the workflow run", failure_reason="Skyvern failed to setup the workflow run",
@@ -206,7 +206,7 @@ async def initialize_observer_task(
except Exception: except Exception:
LOG.warning("Failed to update task 2.0", exc_info=True) LOG.warning("Failed to update task 2.0", exc_info=True)
# fail the workflow run # fail the workflow run
await mark_observer_task_as_failed( await mark_task_v2_as_failed(
task_v2_id=task_v2.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=workflow_run.workflow_run_id, workflow_run_id=workflow_run.workflow_run_id,
failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run", failure_reason="Skyvern failed to update the task 2.0 after initializing the workflow run",
@@ -217,7 +217,7 @@ async def initialize_observer_task(
return task_v2 return task_v2
async def run_observer_task( async def run_task_v2(
organization: Organization, organization: Organization,
task_v2_id: str, task_v2_id: str,
request_id: str | None = None, request_id: str | None = None,
@@ -226,7 +226,7 @@ async def run_observer_task(
) -> ObserverTask: ) -> ObserverTask:
organization_id = organization.organization_id organization_id = organization.organization_id
try: try:
observer_task = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) task_v2 = await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
except Exception: except Exception:
LOG.error( LOG.error(
"Failed to get observer task", "Failed to get observer task",
@@ -234,47 +234,47 @@ async def run_observer_task(
organization_id=organization_id, organization_id=organization_id,
exc_info=True, exc_info=True,
) )
return await mark_observer_task_as_failed( return await mark_task_v2_as_failed(
task_v2_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
failure_reason="Failed to get task v2", failure_reason="Failed to get task v2",
) )
if not observer_task: if not task_v2:
LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id) LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id)
raise TaskV2NotFound(task_v2_id=task_v2_id) raise TaskV2NotFound(task_v2_id=task_v2_id)
workflow, workflow_run = None, None workflow, workflow_run = None, None
try: try:
workflow, workflow_run, observer_task = await run_observer_task_helper( workflow, workflow_run, task_v2 = await run_task_v2_helper(
organization=organization, organization=organization,
observer_task=observer_task, task_v2=task_v2,
request_id=request_id, request_id=request_id,
max_iterations_override=max_iterations_override, max_iterations_override=max_iterations_override,
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
) )
except TaskTerminationError as e: except TaskTerminationError as e:
observer_task = await mark_observer_task_as_terminated( task_v2 = await mark_task_v2_as_terminated(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
failure_reason=e.message, failure_reason=e.message,
) )
LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message) LOG.info("Task v2 is terminated", task_v2_id=task_v2_id, failure_reason=e.message)
return observer_task return task_v2
except OperationalError: except OperationalError:
LOG.error("Database error when running observer cruise", exc_info=True) LOG.error("Database error when running observer cruise", exc_info=True)
observer_task = await mark_observer_task_as_failed( task_v2 = await mark_task_v2_as_failed(
task_v2_id, task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
failure_reason="Database error when running task 2.0", failure_reason="Database error when running task 2.0",
organization_id=organization_id, organization_id=organization_id,
) )
except Exception as e: except Exception as e:
LOG.error("Failed to run observer cruise", exc_info=True) LOG.error("Failed to run observer cruise", exc_info=True)
failure_reason = f"Failed to run task 2.0: {str(e)}" failure_reason = f"Failed to run task 2.0: {str(e)}"
observer_task = await mark_observer_task_as_failed( task_v2 = await mark_task_v2_as_failed(
task_v2_id, task_v2_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
failure_reason=failure_reason, failure_reason=failure_reason,
organization_id=organization_id, organization_id=organization_id,
) )
@@ -291,40 +291,40 @@ async def run_observer_task(
skyvern_context.reset() skyvern_context.reset()
return observer_task return task_v2
async def run_observer_task_helper( async def run_task_v2_helper(
organization: Organization, organization: Organization,
observer_task: ObserverTask, task_v2: ObserverTask,
request_id: str | None = None, request_id: str | None = None,
max_iterations_override: str | int | None = None, max_iterations_override: str | int | None = None,
browser_session_id: str | None = None, browser_session_id: str | None = None,
) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]: ) -> tuple[Workflow, WorkflowRun, ObserverTask] | tuple[None, None, ObserverTask]:
organization_id = organization.organization_id organization_id = organization.organization_id
task_v2_id = observer_task.observer_cruise_id task_v2_id = task_v2.observer_cruise_id
if observer_task.status != ObserverTaskStatus.queued: if task_v2.status != ObserverTaskStatus.queued:
LOG.error( LOG.error(
"Task v2 is not queued. Duplicate task v2", "Task v2 is not queued. Duplicate task v2",
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
status=observer_task.status, status=task_v2.status,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, task_v2
if not observer_task.url or not observer_task.prompt: if not task_v2.url or not task_v2.prompt:
LOG.error( LOG.error(
"Task v2 url or prompt not found", "Task v2 url or prompt not found",
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, task_v2
if not observer_task.workflow_run_id: if not task_v2.workflow_run_id:
LOG.error( LOG.error(
"Workflow run id not found in task v2", "Workflow run id not found in task v2",
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
organization_id=organization_id, organization_id=organization_id,
) )
return None, None, observer_task return None, None, task_v2
int_max_iterations_override = None int_max_iterations_override = None
if max_iterations_override: if max_iterations_override:
@@ -337,24 +337,24 @@ async def run_observer_task_helper(
max_iterations_override=max_iterations_override, max_iterations_override=max_iterations_override,
) )
workflow_run_id = observer_task.workflow_run_id workflow_run_id = task_v2.workflow_run_id
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id) workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id, organization_id=organization_id)
if not workflow_run: if not workflow_run:
LOG.error("Workflow run not found", workflow_run_id=workflow_run_id) LOG.error("Workflow run not found", workflow_run_id=workflow_run_id)
return None, None, observer_task return None, None, task_v2
else: else:
LOG.info("Workflow run found", workflow_run_id=workflow_run_id) LOG.info("Workflow run found", workflow_run_id=workflow_run_id)
if workflow_run.status != WorkflowRunStatus.queued: if workflow_run.status != WorkflowRunStatus.queued:
LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status) LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status)
return None, None, observer_task return None, None, task_v2
workflow_id = workflow_run.workflow_id workflow_id = workflow_run.workflow_id
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id) workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id)
if not workflow: if not workflow:
LOG.error("Workflow not found", workflow_id=workflow_id) LOG.error("Workflow not found", workflow_id=workflow_id)
return None, None, observer_task return None, None, task_v2
###################### run observer ###################### ###################### run observer ######################
@@ -368,14 +368,14 @@ async def run_observer_task_helper(
) )
) )
observer_task = await app.DATABASE.update_task_v2( task_v2 = await app.DATABASE.update_task_v2(
task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running task_v2_id=task_v2_id, organization_id=organization_id, status=ObserverTaskStatus.running
) )
await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
await _set_up_workflow_context(workflow_id, workflow_run_id, organization) await _set_up_workflow_context(workflow_id, workflow_run_id, organization)
url = str(observer_task.url) url = str(task_v2.url)
user_prompt = observer_task.prompt user_prompt = task_v2.prompt
task_history: list[dict] = [] task_history: list[dict] = []
yaml_blocks: list[BLOCK_YAML_TYPES] = [] yaml_blocks: list[BLOCK_YAML_TYPES] = []
yaml_parameters: list[PARAMETER_YAML_TYPES] = [] yaml_parameters: list[PARAMETER_YAML_TYPES] = []
@@ -401,12 +401,12 @@ async def run_observer_task_helper(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
) )
await mark_observer_task_as_canceled( await mark_task_v2_as_canceled(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=organization_id, organization_id=organization_id,
) )
return workflow, workflow_run, observer_task return workflow, workflow_run, task_v2
LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url)
task_type = "" task_type = ""
@@ -500,8 +500,8 @@ async def run_observer_task_helper(
iteration=i, iteration=i,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
) )
observer_task = await _summarize_observer_task( task_v2 = await _summarize_task_v2(
observer_task=observer_task, task_v2=task_v2,
task_history=task_history, task_history=task_history,
context=context, context=context,
screenshots=scraped_page.screenshots, screenshots=scraped_page.screenshots,
@@ -515,7 +515,7 @@ async def run_observer_task_helper(
# parse observer repsonse and run the next task # parse observer repsonse and run the next task
if not task_type: if not task_type:
LOG.error("No task type found in observer response", observer_response=observer_response) LOG.error("No task type found in observer response", observer_response=observer_response)
await mark_observer_task_as_failed( await mark_task_v2_as_failed(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason="Skyvern failed to generate a task. Please try again later.", failure_reason="Skyvern failed to generate a task. Please try again later.",
@@ -524,7 +524,7 @@ async def run_observer_task_helper(
if task_type == "extract": if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
task_v2=observer_task, task_v2=task_v2,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id, workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -543,14 +543,14 @@ async def run_observer_task_helper(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
original_url=original_url, original_url=original_url,
navigation_goal=navigation_goal, navigation_goal=navigation_goal,
totp_verification_url=observer_task.totp_verification_url, totp_verification_url=task_v2.totp_verification_url,
totp_identifier=observer_task.totp_identifier, totp_identifier=task_v2.totp_identifier,
) )
task_history_record = {"type": task_type, "task": plan} task_history_record = {"type": task_type, "task": plan}
elif task_type == "loop": elif task_type == "loop":
try: try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
task_v2=observer_task, task_v2=task_v2,
workflow_id=workflow_id, workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id, workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
@@ -567,7 +567,7 @@ async def run_observer_task_helper(
} }
except Exception: except Exception:
LOG.exception("Failed to generate loop task") LOG.exception("Failed to generate loop task")
await mark_observer_task_as_failed( await mark_task_v2_as_failed(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason="Failed to generate the loop.", failure_reason="Failed to generate the loop.",
@@ -575,7 +575,7 @@ async def run_observer_task_helper(
break break
else: else:
LOG.info("Unsupported task type", task_type=task_type) LOG.info("Unsupported task type", task_type=task_type)
await mark_observer_task_as_failed( await mark_task_v2_as_failed(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
failure_reason=f"Unsupported task block type gets generated: {task_type}", failure_reason=f"Unsupported task block type gets generated: {task_type}",
@@ -612,7 +612,7 @@ async def run_observer_task_helper(
workflow_create_request = WorkflowCreateYAMLRequest( workflow_create_request = WorkflowCreateYAMLRequest(
title=workflow.title, title=workflow.title,
description=workflow.description, description=workflow.description,
proxy_location=observer_task.proxy_location or ProxyLocation.RESIDENTIAL, proxy_location=task_v2.proxy_location or ProxyLocation.RESIDENTIAL,
workflow_definition=workflow_definition_yaml, workflow_definition=workflow_definition_yaml,
status=workflow.status, status=workflow.status,
) )
@@ -702,8 +702,8 @@ async def run_observer_task_helper(
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
completion_resp=completion_resp, completion_resp=completion_resp,
) )
observer_task = await _summarize_observer_task( task_v2 = await _summarize_task_v2(
observer_task=observer_task, task_v2=task_v2,
task_history=task_history, task_history=task_history,
context=context, context=context,
screenshots=completion_screenshots, screenshots=completion_screenshots,
@@ -715,7 +715,7 @@ async def run_observer_task_helper(
max_iterations=max_iterations, max_iterations=max_iterations,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
) )
observer_task = await mark_observer_task_as_failed( task_v2 = await mark_task_v2_as_failed(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
# TODO: add a better failure reason with LLM # TODO: add a better failure reason with LLM
@@ -723,7 +723,7 @@ async def run_observer_task_helper(
organization_id=organization_id, organization_id=organization_id,
) )
return workflow, workflow_run, observer_task return workflow, workflow_run, task_v2
async def handle_block_result( async def handle_block_result(
@@ -745,7 +745,7 @@ async def handle_block_result(
block_type_var=block.block_type, block_type_var=block.block_type,
block_label=block.label, block_label=block.label,
) )
await mark_observer_task_as_canceled( await mark_task_v2_as_canceled(
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
organization_id=workflow_run.organization_id, organization_id=workflow_run.organization_id,
@@ -1200,13 +1200,13 @@ async def get_task_v2(task_v2_id: str, organization_id: str | None = None) -> Ob
return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id) return await app.DATABASE.get_task_v2(task_v2_id, organization_id=organization_id)
async def mark_observer_task_as_failed( async def mark_task_v2_as_failed(
task_v2_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
failure_reason: str | None = None, failure_reason: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_task_v2( task_v2 = await app.DATABASE.update_task_v2(
task_v2_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.failed, status=ObserverTaskStatus.failed,
@@ -1215,18 +1215,18 @@ async def mark_observer_task_as_failed(
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed" workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
) )
await send_observer_task_webhook(observer_task) await send_task_v2_webhook(task_v2)
return observer_task return task_v2
async def mark_observer_task_as_completed( async def mark_task_v2_as_completed(
task_v2_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
summary: str | None = None, summary: str | None = None,
output: dict[str, Any] | None = None, output: dict[str, Any] | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_task_v2( task_v2 = await app.DATABASE.update_task_v2(
task_v2_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.completed, status=ObserverTaskStatus.completed,
@@ -1237,51 +1237,51 @@ async def mark_observer_task_as_completed(
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
# Track observer cruise duration when completed # Track observer cruise duration when completed
duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds() duration_seconds = (datetime.now(UTC) - task_v2.created_at.replace(tzinfo=UTC)).total_seconds()
LOG.info( LOG.info(
"Observer task duration metrics", "Observer task duration metrics",
task_v2_id=task_v2_id, task_v2_id=task_v2_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,
duration_seconds=duration_seconds, duration_seconds=duration_seconds,
observer_task_status=ObserverTaskStatus.completed, task_v2_status=ObserverTaskStatus.completed,
organization_id=organization_id, organization_id=organization_id,
) )
await send_observer_task_webhook(observer_task) await send_task_v2_webhook(task_v2)
return observer_task return task_v2
async def mark_observer_task_as_canceled( async def mark_task_v2_as_canceled(
task_v2_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_task_v2( task_v2 = await app.DATABASE.update_task_v2(
task_v2_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.canceled, status=ObserverTaskStatus.canceled,
) )
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id) await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id)
await send_observer_task_webhook(observer_task) await send_task_v2_webhook(task_v2)
return observer_task return task_v2
async def mark_observer_task_as_terminated( async def mark_task_v2_as_terminated(
task_v2_id: str, task_v2_id: str,
workflow_run_id: str | None = None, workflow_run_id: str | None = None,
organization_id: str | None = None, organization_id: str | None = None,
failure_reason: str | None = None, failure_reason: str | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_task = await app.DATABASE.update_task_v2( task_v2 = await app.DATABASE.update_task_v2(
task_v2_id, task_v2_id,
organization_id=organization_id, organization_id=organization_id,
status=ObserverTaskStatus.terminated, status=ObserverTaskStatus.terminated,
) )
if workflow_run_id: if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason) await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason)
await send_observer_task_webhook(observer_task) await send_task_v2_webhook(task_v2)
return observer_task return task_v2
def _get_extracted_data_from_block_result( def _get_extracted_data_from_block_result(
@@ -1353,25 +1353,25 @@ def _get_extracted_data_from_block_result(
return None return None
async def _summarize_observer_task( async def _summarize_task_v2(
observer_task: ObserverTask, task_v2: ObserverTask,
task_history: list[dict], task_history: list[dict],
context: SkyvernContext, context: SkyvernContext,
screenshots: list[bytes] | None = None, screenshots: list[bytes] | None = None,
) -> ObserverTask: ) -> ObserverTask:
observer_thought = await app.DATABASE.create_observer_thought( observer_thought = await app.DATABASE.create_observer_thought(
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
organization_id=observer_task.organization_id, organization_id=task_v2.organization_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
workflow_id=observer_task.workflow_id, workflow_id=task_v2.workflow_id,
workflow_permanent_id=observer_task.workflow_permanent_id, workflow_permanent_id=task_v2.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.user_goal_check, observer_thought_type=ObserverThoughtType.user_goal_check,
observer_thought_scenario=ObserverThoughtScenario.summarization, observer_thought_scenario=ObserverThoughtScenario.summarization,
) )
# summarize the observer cruise and format the output # summarize the observer cruise and format the output
task_v2_summary_prompt = prompt_engine.load_prompt( task_v2_summary_prompt = prompt_engine.load_prompt(
"task_v2_summary", "task_v2_summary",
user_goal=observer_task.prompt, user_goal=task_v2.prompt,
task_history=task_history, task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(), local_datetime=datetime.now(context.tz_info).isoformat(),
) )
@@ -1387,24 +1387,24 @@ async def _summarize_observer_task(
summarized_output = task_v2_summary_resp.get("output") summarized_output = task_v2_summary_resp.get("output")
await app.DATABASE.update_observer_thought( await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id, observer_thought_id=observer_thought.observer_thought_id,
organization_id=observer_task.organization_id, organization_id=task_v2.organization_id,
thought=thought, thought=thought,
output=task_v2_summary_resp, output=task_v2_summary_resp,
) )
return await mark_observer_task_as_completed( return await mark_task_v2_as_completed(
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
organization_id=observer_task.organization_id, organization_id=task_v2.organization_id,
summary=thought, summary=thought,
output=summarized_output, output=summarized_output,
) )
async def send_observer_task_webhook(observer_task: ObserverTask) -> None: async def send_task_v2_webhook(task_v2: ObserverTask) -> None:
if not observer_task.webhook_callback_url: if not task_v2.webhook_callback_url:
return return
organization_id = observer_task.organization_id organization_id = task_v2.organization_id
if not organization_id: if not organization_id:
return return
api_key = await app.DATABASE.get_valid_org_auth_token( api_key = await app.DATABASE.get_valid_org_auth_token(
@@ -1414,37 +1414,37 @@ async def send_observer_task_webhook(observer_task: ObserverTask) -> None:
if not api_key: if not api_key:
LOG.warning( LOG.warning(
"No valid API key found for the organization of observer cruise", "No valid API key found for the organization of observer cruise",
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
) )
return return
# build the observer cruise response # build the observer cruise response
payload = observer_task.model_dump_json(by_alias=True) payload = task_v2.model_dump_json(by_alias=True)
headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token) headers = generate_skyvern_webhook_headers(payload=payload, api_key=api_key.token)
LOG.info( LOG.info(
"Sending observer cruise response to webhook callback url", "Sending observer cruise response to webhook callback url",
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
webhook_callback_url=observer_task.webhook_callback_url, webhook_callback_url=task_v2.webhook_callback_url,
payload=payload, payload=payload,
headers=headers, headers=headers,
) )
try: try:
resp = await httpx.AsyncClient().post( resp = await httpx.AsyncClient().post(
observer_task.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0) task_v2.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)
) )
if resp.status_code == 200: if resp.status_code == 200:
LOG.info( LOG.info(
"Observer cruise webhook sent successfully", "Observer cruise webhook sent successfully",
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
resp_code=resp.status_code, resp_code=resp.status_code,
resp_text=resp.text, resp_text=resp.text,
) )
else: else:
LOG.info( LOG.info(
"Observer cruise webhook failed", "Observer cruise webhook failed",
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
resp=resp, resp=resp,
resp_code=resp.status_code, resp_code=resp.status_code,
resp_text=resp.text, resp_text=resp.text,
) )
except Exception as e: except Exception as e:
raise FailedToSendWebhook(task_v2_id=observer_task.observer_cruise_id) from e raise FailedToSendWebhook(task_v2_id=task_v2.observer_cruise_id) from e

View File

@@ -2150,7 +2150,7 @@ class TaskV2Block(Block):
workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id, organization_id) workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id, organization_id)
if not workflow_run: if not workflow_run:
raise ValueError(f"WorkflowRun not found {workflow_run_id} when running TaskV2Block") raise ValueError(f"WorkflowRun not found {workflow_run_id} when running TaskV2Block")
observer_task = await task_v2_service.initialize_observer_task( task_v2 = await task_v2_service.initialize_task_v2(
organization, organization,
user_prompt=self.prompt, user_prompt=self.prompt,
user_url=self.url, user_url=self.url,
@@ -2158,29 +2158,29 @@ class TaskV2Block(Block):
proxy_location=workflow_run.proxy_location, proxy_location=workflow_run.proxy_location,
) )
await app.DATABASE.update_task_v2( await app.DATABASE.update_task_v2(
observer_task.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id task_v2.observer_cruise_id, status=ObserverTaskStatus.queued, organization_id=organization_id
) )
if observer_task.workflow_run_id: if task_v2.workflow_run_id:
await app.DATABASE.update_workflow_run( await app.DATABASE.update_workflow_run(
workflow_run_id=observer_task.workflow_run_id, workflow_run_id=task_v2.workflow_run_id,
status=WorkflowRunStatus.queued, status=WorkflowRunStatus.queued,
) )
await app.DATABASE.update_workflow_run_block( await app.DATABASE.update_workflow_run_block(
workflow_run_block_id=workflow_run_block_id, workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id, organization_id=organization_id,
block_workflow_run_id=observer_task.workflow_run_id, block_workflow_run_id=task_v2.workflow_run_id,
) )
observer_task = await task_v2_service.run_observer_task( task_v2 = await task_v2_service.run_task_v2(
organization=organization, organization=organization,
task_v2_id=observer_task.observer_cruise_id, task_v2_id=task_v2.observer_cruise_id,
request_id=None, request_id=None,
max_iterations_override=self.max_iterations, max_iterations_override=self.max_iterations,
browser_session_id=browser_session_id, browser_session_id=browser_session_id,
) )
result_dict = None result_dict = None
if observer_task: if task_v2:
result_dict = observer_task.output result_dict = task_v2.output
return await self.build_block_result( return await self.build_block_result(
success=True, success=True,

View File

@@ -147,5 +147,5 @@ class WorkflowRunStatusResponse(BaseModel):
outputs: dict[str, Any] | None = None outputs: dict[str, Any] | None = None
total_steps: int | None = None total_steps: int | None = None
total_cost: float | None = None total_cost: float | None = None
observer_task: ObserverTask | None = None task_v2: ObserverTask | None = None
workflow_title: str | None = None workflow_title: str | None = None