fix observer - get url block back; get mark workflow run completion back (#1726)
This commit is contained in:
@@ -34,6 +34,7 @@ from skyvern.forge.sdk.workflow.models.block import (
|
||||
ForLoopBlock,
|
||||
NavigationBlock,
|
||||
TaskBlock,
|
||||
UrlBlock,
|
||||
)
|
||||
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter
|
||||
from skyvern.forge.sdk.workflow.models.workflow import (
|
||||
@@ -51,6 +52,7 @@ from skyvern.forge.sdk.workflow.models.yaml import (
|
||||
ForLoopBlockYAML,
|
||||
NavigationBlockYAML,
|
||||
TaskBlockYAML,
|
||||
UrlBlockYAML,
|
||||
WorkflowCreateYAMLRequest,
|
||||
WorkflowDefinitionYAML,
|
||||
)
|
||||
@@ -118,7 +120,9 @@ async def initialize_observer_task(
|
||||
|
||||
metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url)
|
||||
metadata_response = await app.LLM_API_HANDLER(
|
||||
prompt=metadata_prompt, observer_thought=observer_thought, prompt_name="observer-generate-metadata"
|
||||
prompt=metadata_prompt,
|
||||
observer_thought=observer_thought,
|
||||
prompt_name="observer-generate-metadata",
|
||||
)
|
||||
# validate
|
||||
LOG.info(f"Initialized observer initial response: {metadata_response}")
|
||||
@@ -350,160 +354,176 @@ async def run_observer_task_helper(
|
||||
max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS
|
||||
for i in range(max_iterations):
|
||||
LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url)
|
||||
try:
|
||||
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
|
||||
workflow_run=workflow_run,
|
||||
url=url,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
scraped_page = await scrape_website(
|
||||
browser_state,
|
||||
url,
|
||||
app.AGENT_FUNCTION.cleanup_element_tree_factory(),
|
||||
scrape_exclude=app.scrape_exclude,
|
||||
)
|
||||
element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML)
|
||||
page = await browser_state.get_working_page()
|
||||
except Exception:
|
||||
LOG.exception("Failed to get browser state or scrape website in observer iteration", iteration=i, url=url)
|
||||
continue
|
||||
current_url = str(
|
||||
await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url
|
||||
)
|
||||
|
||||
context = skyvern_context.ensure_context()
|
||||
observer_prompt = prompt_engine.load_prompt(
|
||||
"observer",
|
||||
current_url=current_url,
|
||||
elements=element_tree_in_prompt,
|
||||
user_goal=user_prompt,
|
||||
task_history=task_history,
|
||||
local_datetime=datetime.now(context.tz_info).isoformat(),
|
||||
)
|
||||
observer_thought = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise_id,
|
||||
organization_id=organization_id,
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
workflow_id=workflow.workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.plan,
|
||||
observer_thought_scenario=ObserverThoughtScenario.generate_plan,
|
||||
)
|
||||
observer_response = await app.LLM_API_HANDLER(
|
||||
prompt=observer_prompt,
|
||||
screenshots=scraped_page.screenshots,
|
||||
observer_thought=observer_thought,
|
||||
prompt_name="observer-generate-plan",
|
||||
)
|
||||
LOG.info(
|
||||
"Observer response",
|
||||
observer_response=observer_response,
|
||||
iteration=i,
|
||||
current_url=current_url,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
# see if the user goal has achieved or not
|
||||
user_goal_achieved = observer_response.get("user_goal_achieved", False)
|
||||
observation = observer_response.get("page_info", "")
|
||||
thoughts: str = observer_response.get("thoughts", "")
|
||||
plan: str = observer_response.get("plan", "")
|
||||
task_type: str = observer_response.get("task_type", "")
|
||||
# Create and save observer thought
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
organization_id=organization_id,
|
||||
thought=thoughts,
|
||||
observation=observation,
|
||||
answer=plan,
|
||||
output={"task_type": task_type, "user_goal_achieved": user_goal_achieved},
|
||||
)
|
||||
|
||||
if user_goal_achieved is True:
|
||||
LOG.info(
|
||||
"User goal achieved. Workflow run will complete. Observer is stopping",
|
||||
iteration=i,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
observer_task = await _summarize_observer_task(
|
||||
observer_task=observer_task,
|
||||
task_history=task_history,
|
||||
context=context,
|
||||
screenshots=scraped_page.screenshots,
|
||||
)
|
||||
break
|
||||
|
||||
if not plan:
|
||||
LOG.warning("No plan found in observer response", observer_response=observer_response)
|
||||
continue
|
||||
|
||||
# parse observer repsonse and run the next task
|
||||
if not task_type:
|
||||
LOG.error("No task type found in observer response", observer_response=observer_response)
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run_id,
|
||||
failure_reason="Skyvern failed to generate a task. Please try again later.",
|
||||
)
|
||||
break
|
||||
|
||||
task_type = ""
|
||||
plan = ""
|
||||
block: BlockTypeVar | None = None
|
||||
task_history_record: dict[str, Any] = {}
|
||||
if task_type == "extract":
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
|
||||
observer_cruise=observer_task,
|
||||
context = skyvern_context.ensure_context()
|
||||
|
||||
if i == 0:
|
||||
# The first iteration is always a GOTO_URL task
|
||||
task_type = "goto_url"
|
||||
plan = f"Go to this website: {url}"
|
||||
task_history_record = {"type": task_type, "task": plan}
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_goto_url_task(
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
current_url=current_url,
|
||||
element_tree_in_prompt=element_tree_in_prompt,
|
||||
data_extraction_goal=plan,
|
||||
task_history=task_history,
|
||||
url=url,
|
||||
)
|
||||
task_history_record = {"type": task_type, "task": plan}
|
||||
elif task_type == "navigate":
|
||||
original_url = url if i == 0 else None
|
||||
navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan)
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
original_url=original_url,
|
||||
navigation_goal=navigation_goal,
|
||||
totp_verification_url=observer_task.totp_verification_url,
|
||||
totp_identifier=observer_task.totp_identifier,
|
||||
)
|
||||
task_history_record = {"type": task_type, "task": plan}
|
||||
elif task_type == "loop":
|
||||
else:
|
||||
try:
|
||||
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
|
||||
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
|
||||
workflow_run=workflow_run,
|
||||
url=url,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
scraped_page = await scrape_website(
|
||||
browser_state,
|
||||
url,
|
||||
app.AGENT_FUNCTION.cleanup_element_tree_factory(),
|
||||
scrape_exclude=app.scrape_exclude,
|
||||
)
|
||||
element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML)
|
||||
page = await browser_state.get_working_page()
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
"Failed to get browser state or scrape website in observer iteration", iteration=i, url=url
|
||||
)
|
||||
continue
|
||||
current_url = str(
|
||||
await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url
|
||||
)
|
||||
|
||||
observer_prompt = prompt_engine.load_prompt(
|
||||
"observer",
|
||||
current_url=current_url,
|
||||
elements=element_tree_in_prompt,
|
||||
user_goal=user_prompt,
|
||||
task_history=task_history,
|
||||
local_datetime=datetime.now(context.tz_info).isoformat(),
|
||||
)
|
||||
observer_thought = await app.DATABASE.create_observer_thought(
|
||||
observer_cruise_id=observer_cruise_id,
|
||||
organization_id=organization_id,
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
workflow_id=workflow.workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
observer_thought_type=ObserverThoughtType.plan,
|
||||
observer_thought_scenario=ObserverThoughtScenario.generate_plan,
|
||||
)
|
||||
observer_response = await app.LLM_API_HANDLER(
|
||||
prompt=observer_prompt,
|
||||
screenshots=scraped_page.screenshots,
|
||||
observer_thought=observer_thought,
|
||||
prompt_name="observer",
|
||||
)
|
||||
LOG.info(
|
||||
"Observer response",
|
||||
observer_response=observer_response,
|
||||
iteration=i,
|
||||
current_url=current_url,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
# see if the user goal has achieved or not
|
||||
user_goal_achieved = observer_response.get("user_goal_achieved", False)
|
||||
observation = observer_response.get("page_info", "")
|
||||
thoughts: str = observer_response.get("thoughts", "")
|
||||
plan = observer_response.get("plan", "")
|
||||
task_type = observer_response.get("task_type", "")
|
||||
# Create and save observer thought
|
||||
await app.DATABASE.update_observer_thought(
|
||||
observer_thought_id=observer_thought.observer_thought_id,
|
||||
organization_id=organization_id,
|
||||
thought=thoughts,
|
||||
observation=observation,
|
||||
answer=plan,
|
||||
output={"task_type": task_type, "user_goal_achieved": user_goal_achieved},
|
||||
)
|
||||
|
||||
if user_goal_achieved is True:
|
||||
LOG.info(
|
||||
"User goal achieved. Workflow run will complete. Observer is stopping",
|
||||
iteration=i,
|
||||
workflow_run_id=workflow_run_id,
|
||||
)
|
||||
observer_task = await _summarize_observer_task(
|
||||
observer_task=observer_task,
|
||||
task_history=task_history,
|
||||
context=context,
|
||||
screenshots=scraped_page.screenshots,
|
||||
)
|
||||
break
|
||||
|
||||
if not plan:
|
||||
LOG.warning("No plan found in observer response", observer_response=observer_response)
|
||||
continue
|
||||
|
||||
# parse observer repsonse and run the next task
|
||||
if not task_type:
|
||||
LOG.error("No task type found in observer response", observer_response=observer_response)
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run_id,
|
||||
failure_reason="Skyvern failed to generate a task. Please try again later.",
|
||||
)
|
||||
break
|
||||
|
||||
if task_type == "extract":
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
|
||||
observer_cruise=observer_task,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
plan=plan,
|
||||
browser_state=browser_state,
|
||||
original_url=url,
|
||||
scraped_page=scraped_page,
|
||||
current_url=current_url,
|
||||
element_tree_in_prompt=element_tree_in_prompt,
|
||||
data_extraction_goal=plan,
|
||||
task_history=task_history,
|
||||
)
|
||||
task_history_record = {
|
||||
"type": task_type,
|
||||
"task": plan,
|
||||
"loop_over_values": extraction_obj.get("loop_values"),
|
||||
"task_inside_the_loop": inner_task,
|
||||
}
|
||||
except Exception:
|
||||
LOG.exception("Failed to generate loop task")
|
||||
task_history_record = {"type": task_type, "task": plan}
|
||||
elif task_type == "navigate":
|
||||
original_url = url if i == 0 else None
|
||||
navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan)
|
||||
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
original_url=original_url,
|
||||
navigation_goal=navigation_goal,
|
||||
totp_verification_url=observer_task.totp_verification_url,
|
||||
totp_identifier=observer_task.totp_identifier,
|
||||
)
|
||||
task_history_record = {"type": task_type, "task": plan}
|
||||
elif task_type == "loop":
|
||||
try:
|
||||
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
|
||||
observer_cruise=observer_task,
|
||||
workflow_id=workflow_id,
|
||||
workflow_permanent_id=workflow.workflow_permanent_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
plan=plan,
|
||||
browser_state=browser_state,
|
||||
original_url=url,
|
||||
scraped_page=scraped_page,
|
||||
)
|
||||
task_history_record = {
|
||||
"type": task_type,
|
||||
"task": plan,
|
||||
"loop_over_values": extraction_obj.get("loop_values"),
|
||||
"task_inside_the_loop": inner_task,
|
||||
}
|
||||
except Exception:
|
||||
LOG.exception("Failed to generate loop task")
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run_id,
|
||||
failure_reason="Failed to generate the loop.",
|
||||
)
|
||||
break
|
||||
else:
|
||||
LOG.info("Unsupported task type", task_type=task_type)
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run_id,
|
||||
failure_reason="Failed to generate the loop.",
|
||||
failure_reason=f"Unsupported task block type gets generated: {task_type}",
|
||||
)
|
||||
break
|
||||
else:
|
||||
LOG.info("Unsupported task type", task_type=task_type)
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run_id,
|
||||
failure_reason=f"Unsupported task block type gets generated: {task_type}",
|
||||
)
|
||||
break
|
||||
|
||||
# generate the extraction task
|
||||
block_result = await block.execute_safe(
|
||||
@@ -565,6 +585,11 @@ async def run_observer_task_helper(
|
||||
if block_result.success is True:
|
||||
completion_screenshots = []
|
||||
try:
|
||||
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
|
||||
workflow_run=workflow_run,
|
||||
url=url,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
scraped_page = await scrape_website(
|
||||
browser_state,
|
||||
url,
|
||||
@@ -595,7 +620,7 @@ async def run_observer_task_helper(
|
||||
prompt=observer_completion_prompt,
|
||||
screenshots=completion_screenshots,
|
||||
observer_thought=observer_thought,
|
||||
prompt_name="observer-check-completion",
|
||||
prompt_name="observer_check_completion",
|
||||
)
|
||||
LOG.info(
|
||||
"Observer completion check response",
|
||||
@@ -900,7 +925,7 @@ async def _generate_loop_task(
|
||||
task_in_loop_metadata_prompt,
|
||||
screenshots=scraped_page.screenshots,
|
||||
observer_thought=observer_thought_task_in_loop,
|
||||
prompt_name="observer-generate-task-block",
|
||||
prompt_name="observer_generate_task_block",
|
||||
)
|
||||
LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response)
|
||||
navigation_goal = task_in_loop_metadata_response.get("navigation_goal")
|
||||
@@ -996,7 +1021,7 @@ async def _generate_extraction_task(
|
||||
generate_extraction_task_response = await app.LLM_API_HANDLER(
|
||||
generate_extraction_task_prompt,
|
||||
observer_cruise=observer_cruise,
|
||||
prompt_name="observer-generate-extraction-task",
|
||||
prompt_name="observer_generate_extraction_task",
|
||||
)
|
||||
LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response)
|
||||
|
||||
@@ -1067,6 +1092,34 @@ async def _generate_navigation_task(
|
||||
)
|
||||
|
||||
|
||||
async def _generate_goto_url_task(
|
||||
workflow_id: str,
|
||||
url: str,
|
||||
) -> tuple[UrlBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]:
|
||||
LOG.info("Generating goto url task", url=url)
|
||||
# create OutputParameter for the data_extraction block
|
||||
label = f"goto_url_{_generate_random_string()}"
|
||||
|
||||
url_block_yaml = UrlBlockYAML(
|
||||
label=label,
|
||||
url=url,
|
||||
)
|
||||
output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block(
|
||||
workflow_id=workflow_id,
|
||||
block_yaml=url_block_yaml,
|
||||
)
|
||||
# create UrlBlock
|
||||
return (
|
||||
UrlBlock(
|
||||
label=label,
|
||||
url=url,
|
||||
output_parameter=output_parameter,
|
||||
),
|
||||
[url_block_yaml],
|
||||
[],
|
||||
)
|
||||
|
||||
|
||||
def _generate_random_string(length: int = 5) -> str:
|
||||
# Use the current timestamp as the seed
|
||||
random.seed(os.urandom(16))
|
||||
@@ -1127,13 +1180,14 @@ async def mark_observer_task_as_completed(
|
||||
output: dict[str, Any] | None = None,
|
||||
) -> ObserverTask:
|
||||
observer_task = await app.DATABASE.update_observer_cruise(
|
||||
observer_cruise_id=observer_cruise_id,
|
||||
observer_cruise_id,
|
||||
organization_id=organization_id,
|
||||
status=ObserverTaskStatus.completed,
|
||||
workflow_run_id=workflow_run_id,
|
||||
summary=summary,
|
||||
output=output,
|
||||
)
|
||||
if workflow_run_id:
|
||||
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
|
||||
|
||||
# Track observer cruise duration when completed
|
||||
duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds()
|
||||
@@ -1245,7 +1299,7 @@ async def _summarize_observer_task(
|
||||
prompt=observer_summary_prompt,
|
||||
screenshots=screenshots,
|
||||
observer_thought=observer_thought,
|
||||
prompt_name="observer-generate-summary",
|
||||
prompt_name="observer_summary",
|
||||
)
|
||||
LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user