new observer thoughts (#1442)
This commit is contained in:
@@ -10,9 +10,16 @@ from pydantic import BaseModel
|
||||
from skyvern.exceptions import UrlGenerationFailure
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.forge.sdk.artifact.models import ArtifactType
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
|
||||
from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverCruiseStatus, ObserverMetadata
|
||||
from skyvern.forge.sdk.schemas.observers import (
|
||||
ObserverCruise,
|
||||
ObserverCruiseStatus,
|
||||
ObserverMetadata,
|
||||
ObserverThoughtScenario,
|
||||
ObserverThoughtType,
|
||||
)
|
||||
from skyvern.forge.sdk.schemas.organizations import Organization
|
||||
from skyvern.forge.sdk.schemas.tasks import ProxyLocation
|
||||
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType
|
||||
@@ -76,6 +83,13 @@ async def initialize_observer_cruise(
|
||||
organization_id=organization.organization_id,
|
||||
)
|
||||
|
||||
observer_thought = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
||||
organization_id=organization.organization_id,
|
||||
observer_thought_type=ObserverThoughtType.metadata,
|
||||
observer_thought_scenario=ObserverThoughtScenario.generate_metadata,
|
||||
)
|
||||
|
||||
metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url)
|
||||
metadata_response = await app.SECONDARY_LLM_API_HANDLER(prompt=metadata_prompt, observer_cruise=observer_cruise)
|
||||
# validate
|
||||
@@ -103,6 +117,16 @@ async def initialize_observer_cruise(
|
||||
version=None,
|
||||
max_steps_override=max_steps_override,
|
||||
)
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
organization_id=organization.organization_id,
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
workflow_id=new_workflow.workflow_id,
|
||||
workflow_permanent_id=new_workflow.workflow_permanent_id,
|
||||
thought=metadata_response.get("thoughts", ""),
|
||||
output=metadata.model_dump(),
|
||||
)
|
||||
|
||||
# update oserver cruise
|
||||
observer_cruise = await app.DATABASE.update_observer_cruise(
|
||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
||||
@@ -237,6 +261,8 @@ async def run_observer_cruise(
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
workflow_id=workflow.workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.plan,
|
||||
observer_thought_scenario=ObserverThoughtScenario.generate_plan,
|
||||
)
|
||||
observer_response = await app.LLM_API_HANDLER(
|
||||
prompt=observer_prompt,
|
||||
@@ -255,6 +281,7 @@ async def run_observer_cruise(
|
||||
observation = observer_response.get("page_info", "")
|
||||
thoughts: str = observer_response.get("thoughts", "")
|
||||
plan: str = observer_response.get("plan", "")
|
||||
task_type: str = observer_response.get("task_type", "")
|
||||
# Create and save observer thought
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
@@ -262,6 +289,7 @@ async def run_observer_cruise(
|
||||
thought=thoughts,
|
||||
observation=observation,
|
||||
answer=plan,
|
||||
output={"task_type": task_type, "user_goal_achieved": user_goal_achieved},
|
||||
)
|
||||
|
||||
if user_goal_achieved is True:
|
||||
@@ -274,7 +302,6 @@ async def run_observer_cruise(
|
||||
break
|
||||
|
||||
# parse observer repsonse and run the next task
|
||||
task_type = observer_response.get("task_type")
|
||||
if not task_type:
|
||||
LOG.error("No task type found in observer response", observer_response=observer_response)
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
@@ -288,6 +315,8 @@ async def run_observer_cruise(
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
|
||||
observer_cruise=observer_cruise,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
current_url=current_url,
|
||||
element_tree_in_prompt=element_tree_in_prompt,
|
||||
data_extraction_goal=plan,
|
||||
@@ -298,6 +327,8 @@ async def run_observer_cruise(
|
||||
original_url = url if i == 0 else None
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
original_url=original_url,
|
||||
navigation_goal=plan,
|
||||
)
|
||||
@@ -307,6 +338,7 @@ async def run_observer_cruise(
|
||||
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
|
||||
observer_cruise=observer_cruise,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
plan=plan,
|
||||
browser_state=browser_state,
|
||||
@@ -378,8 +410,18 @@ async def run_observer_cruise(
|
||||
task_history=task_history,
|
||||
local_datetime=datetime.now(context.tz_info).isoformat(),
|
||||
)
|
||||
observer_thought = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise_id,
|
||||
organization_id=organization_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.user_goal_check,
|
||||
observer_thought_scenario=ObserverThoughtScenario.user_goal_check,
|
||||
)
|
||||
completion_resp = await app.LLM_API_HANDLER(
|
||||
prompt=observer_completion_prompt, observer_cruise=observer_cruise
|
||||
prompt=observer_completion_prompt,
|
||||
observer_cruise=observer_thought,
|
||||
)
|
||||
LOG.info(
|
||||
"Observer completion check response",
|
||||
@@ -388,7 +430,15 @@ async def run_observer_cruise(
|
||||
workflow_run_id=workflow_run_id,
|
||||
task_history=task_history,
|
||||
)
|
||||
if completion_resp.get("user_goal_achieved", False):
|
||||
user_goal_achieved = completion_resp.get("user_goal_achieved", False)
|
||||
thought = completion_resp.get("thoughts", "")
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
organization_id=organization_id,
|
||||
thought=thought,
|
||||
output={"user_goal_achieved": user_goal_achieved},
|
||||
)
|
||||
if user_goal_achieved:
|
||||
LOG.info(
|
||||
"User goal achieved according to the observer completion check",
|
||||
iteration=i,
|
||||
@@ -514,6 +564,7 @@ async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str) -> No
|
||||
async def _generate_loop_task(
|
||||
observer_cruise: ObserverCruise,
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_run_id: str,
|
||||
plan: str,
|
||||
browser_state: BrowserState,
|
||||
@@ -525,7 +576,25 @@ async def _generate_loop_task(
|
||||
"observer_loop_task_extraction_goal",
|
||||
plan=plan,
|
||||
)
|
||||
|
||||
data_extraction_thought = f"Going to generate a list of values to go through based on the plan: {plan}."
|
||||
observer_thought = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
||||
organization_id=observer_cruise.organization_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.plan,
|
||||
observer_thought_scenario=ObserverThoughtScenario.extract_loop_values,
|
||||
thought=data_extraction_thought,
|
||||
)
|
||||
# generate screenshot artifact for the observer thought
|
||||
if scraped_page.screenshots:
|
||||
for screenshot in scraped_page.screenshots:
|
||||
await app.ARTIFACT_MANAGER.create_observer_thought_artifact(
|
||||
observer_thought=observer_thought,
|
||||
artifact_type=ArtifactType.SCREENSHOT_LLM,
|
||||
data=screenshot,
|
||||
)
|
||||
label = f"extraction_task_for_loop_{_generate_random_string()}"
|
||||
extraction_block_yaml = ExtractionBlockYAML(
|
||||
label=label,
|
||||
@@ -576,6 +645,13 @@ async def _generate_loop_task(
|
||||
)
|
||||
raise
|
||||
|
||||
# update the observer thought
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
organization_id=observer_cruise.organization_id,
|
||||
output=output_value_obj.model_dump(),
|
||||
)
|
||||
|
||||
# create ContextParameter for the loop over pointer that ForLoopBlock needs.
|
||||
loop_for_context_parameter = ContextParameter(
|
||||
key="loop_values",
|
||||
@@ -627,15 +703,31 @@ async def _generate_loop_task(
|
||||
is_link=output_value_obj.is_loop_value_link,
|
||||
loop_values=output_value_obj.loop_values,
|
||||
)
|
||||
observer_thought_task_in_loop = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise.observer_cruise_id,
|
||||
organization_id=observer_cruise.organization_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.internal_plan,
|
||||
observer_thought_scenario=ObserverThoughtScenario.generate_task_in_loop,
|
||||
)
|
||||
task_in_loop_metadata_response = await app.LLM_API_HANDLER(
|
||||
task_in_loop_metadata_prompt,
|
||||
screenshots=scraped_page.screenshots,
|
||||
observer_cruise=observer_cruise,
|
||||
observer_thought=observer_thought_task_in_loop,
|
||||
)
|
||||
LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response)
|
||||
navigation_goal = task_in_loop_metadata_response.get("navigation_goal")
|
||||
data_extraction_goal = task_in_loop_metadata_response.get("data_extraction_goal")
|
||||
data_extraction_schema = task_in_loop_metadata_response.get("data_schema")
|
||||
thought = task_in_loop_metadata_response.get("thoughts")
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought_task_in_loop.observer_thought_id,
|
||||
organization_id=observer_cruise.organization_id,
|
||||
thought=thought,
|
||||
output=task_in_loop_metadata_response,
|
||||
)
|
||||
if data_extraction_goal and navigation_goal:
|
||||
navigation_goal = (
|
||||
navigation_goal
|
||||
@@ -699,6 +791,8 @@ async def _generate_loop_task(
|
||||
async def _generate_extraction_task(
|
||||
observer_cruise: ObserverCruise,
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_run_id: str,
|
||||
current_url: str,
|
||||
element_tree_in_prompt: str,
|
||||
data_extraction_goal: str,
|
||||
@@ -753,6 +847,8 @@ async def _generate_extraction_task(
|
||||
|
||||
async def _generate_navigation_task(
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
workflow_run_id: str,
|
||||
navigation_goal: str,
|
||||
original_url: str | None = None,
|
||||
) -> tuple[NavigationBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]:
|
||||
|
||||
Reference in New Issue
Block a user