From ffdb823401c1ad0d74cc56c69dd01216d61ccd5f Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 6 Feb 2025 00:50:32 +0800 Subject: [PATCH] fix observer - get url block back; get mark workflow run completion back (#1726) --- .../forge/sdk/services/observer_service.py | 346 ++++++++++-------- 1 file changed, 200 insertions(+), 146 deletions(-) diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index d3210373..d79380b9 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -34,6 +34,7 @@ from skyvern.forge.sdk.workflow.models.block import ( ForLoopBlock, NavigationBlock, TaskBlock, + UrlBlock, ) from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter from skyvern.forge.sdk.workflow.models.workflow import ( @@ -51,6 +52,7 @@ from skyvern.forge.sdk.workflow.models.yaml import ( ForLoopBlockYAML, NavigationBlockYAML, TaskBlockYAML, + UrlBlockYAML, WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) @@ -118,7 +120,9 @@ async def initialize_observer_task( metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url) metadata_response = await app.LLM_API_HANDLER( - prompt=metadata_prompt, observer_thought=observer_thought, prompt_name="observer-generate-metadata" + prompt=metadata_prompt, + observer_thought=observer_thought, + prompt_name="observer-generate-metadata", ) # validate LOG.info(f"Initialized observer initial response: {metadata_response}") @@ -350,160 +354,176 @@ async def run_observer_task_helper( max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS for i in range(max_iterations): LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) - try: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, - url=url, - browser_session_id=browser_session_id, - ) - scraped_page = await scrape_website( - browser_state, - url, - app.AGENT_FUNCTION.cleanup_element_tree_factory(), - scrape_exclude=app.scrape_exclude, - ) - element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) - page = await browser_state.get_working_page() - except Exception: - LOG.exception("Failed to get browser state or scrape website in observer iteration", iteration=i, url=url) - continue - current_url = str( - await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url - ) - - context = skyvern_context.ensure_context() - observer_prompt = prompt_engine.load_prompt( - "observer", - current_url=current_url, - elements=element_tree_in_prompt, - user_goal=user_prompt, - task_history=task_history, - local_datetime=datetime.now(context.tz_info).isoformat(), - ) - observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise_id, - organization_id=organization_id, - workflow_run_id=workflow_run.workflow_run_id, - workflow_id=workflow.workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.plan, - observer_thought_scenario=ObserverThoughtScenario.generate_plan, - ) - observer_response = await app.LLM_API_HANDLER( - prompt=observer_prompt, - screenshots=scraped_page.screenshots, - observer_thought=observer_thought, - prompt_name="observer-generate-plan", - ) - LOG.info( - "Observer response", - observer_response=observer_response, - iteration=i, - current_url=current_url, - workflow_run_id=workflow_run_id, - ) - # see if the user goal has achieved or not - user_goal_achieved = observer_response.get("user_goal_achieved", False) - observation = observer_response.get("page_info", "") - thoughts: str = observer_response.get("thoughts", "") - plan: str = observer_response.get("plan", "") - task_type: str = observer_response.get("task_type", "") - # Create and save observer thought - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, - organization_id=organization_id, - thought=thoughts, - observation=observation, - answer=plan, - output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, - ) - - if user_goal_achieved is True: - LOG.info( - "User goal achieved. Workflow run will complete. Observer is stopping", - iteration=i, - workflow_run_id=workflow_run_id, - ) - observer_task = await _summarize_observer_task( - observer_task=observer_task, - task_history=task_history, - context=context, - screenshots=scraped_page.screenshots, - ) - break - - if not plan: - LOG.warning("No plan found in observer response", observer_response=observer_response) - continue - - # parse observer repsonse and run the next task - if not task_type: - LOG.error("No task type found in observer response", observer_response=observer_response) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason="Skyvern failed to generate a task. Please try again later.", - ) - break - + task_type = "" + plan = "" block: BlockTypeVar | None = None task_history_record: dict[str, Any] = {} - if task_type == "extract": - block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( - observer_cruise=observer_task, + context = skyvern_context.ensure_context() + + if i == 0: + # The first iteration is always a GOTO_URL task + task_type = "goto_url" + plan = f"Go to this website: {url}" + task_history_record = {"type": task_type, "task": plan} + block, block_yaml_list, parameter_yaml_list = await _generate_goto_url_task( workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - current_url=current_url, - element_tree_in_prompt=element_tree_in_prompt, - data_extraction_goal=plan, - task_history=task_history, + url=url, ) task_history_record = {"type": task_type, "task": plan} - elif task_type == "navigate": - original_url = url if i == 0 else None - navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) - block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( - workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - original_url=original_url, - navigation_goal=navigation_goal, - totp_verification_url=observer_task.totp_verification_url, - totp_identifier=observer_task.totp_identifier, - ) - task_history_record = {"type": task_type, "task": plan} - elif task_type == "loop": + else: try: - block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + url=url, + browser_session_id=browser_session_id, + ) + scraped_page = await scrape_website( + browser_state, + url, + app.AGENT_FUNCTION.cleanup_element_tree_factory(), + scrape_exclude=app.scrape_exclude, + ) + element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) + page = await browser_state.get_working_page() + except Exception: + LOG.exception( + "Failed to get browser state or scrape website in observer iteration", iteration=i, url=url + ) + continue + current_url = str( + await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url + ) + + observer_prompt = prompt_engine.load_prompt( + "observer", + current_url=current_url, + elements=element_tree_in_prompt, + user_goal=user_prompt, + task_history=task_history, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + observer_thought = await app.DATABASE.create_observer_thought( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + observer_thought_type=ObserverThoughtType.plan, + observer_thought_scenario=ObserverThoughtScenario.generate_plan, + ) + observer_response = await app.LLM_API_HANDLER( + prompt=observer_prompt, + screenshots=scraped_page.screenshots, + observer_thought=observer_thought, + prompt_name="observer", + ) + LOG.info( + "Observer response", + observer_response=observer_response, + iteration=i, + current_url=current_url, + workflow_run_id=workflow_run_id, + ) + # see if the user goal has achieved or not + user_goal_achieved = observer_response.get("user_goal_achieved", False) + observation = observer_response.get("page_info", "") + thoughts: str = observer_response.get("thoughts", "") + plan = observer_response.get("plan", "") + task_type = observer_response.get("task_type", "") + # Create and save observer thought + await app.DATABASE.update_observer_thought( + observer_thought_id=observer_thought.observer_thought_id, + organization_id=organization_id, + thought=thoughts, + observation=observation, + answer=plan, + output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, + ) + + if user_goal_achieved is True: + LOG.info( + "User goal achieved. Workflow run will complete. Observer is stopping", + iteration=i, + workflow_run_id=workflow_run_id, + ) + observer_task = await _summarize_observer_task( + observer_task=observer_task, + task_history=task_history, + context=context, + screenshots=scraped_page.screenshots, + ) + break + + if not plan: + LOG.warning("No plan found in observer response", observer_response=observer_response) + continue + + # parse observer repsonse and run the next task + if not task_type: + LOG.error("No task type found in observer response", observer_response=observer_response) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Skyvern failed to generate a task. Please try again later.", + ) + break + + if task_type == "extract": + block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( observer_cruise=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, - plan=plan, - browser_state=browser_state, - original_url=url, - scraped_page=scraped_page, + current_url=current_url, + element_tree_in_prompt=element_tree_in_prompt, + data_extraction_goal=plan, + task_history=task_history, ) - task_history_record = { - "type": task_type, - "task": plan, - "loop_over_values": extraction_obj.get("loop_values"), - "task_inside_the_loop": inner_task, - } - except Exception: - LOG.exception("Failed to generate loop task") + task_history_record = {"type": task_type, "task": plan} + elif task_type == "navigate": + original_url = url if i == 0 else None + navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) + block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + original_url=original_url, + navigation_goal=navigation_goal, + totp_verification_url=observer_task.totp_verification_url, + totp_identifier=observer_task.totp_identifier, + ) + task_history_record = {"type": task_type, "task": plan} + elif task_type == "loop": + try: + block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( + observer_cruise=observer_task, + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + plan=plan, + browser_state=browser_state, + original_url=url, + scraped_page=scraped_page, + ) + task_history_record = { + "type": task_type, + "task": plan, + "loop_over_values": extraction_obj.get("loop_values"), + "task_inside_the_loop": inner_task, + } + except Exception: + LOG.exception("Failed to generate loop task") + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Failed to generate the loop.", + ) + break + else: + LOG.info("Unsupported task type", task_type=task_type) await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id=workflow_run_id, - failure_reason="Failed to generate the loop.", + failure_reason=f"Unsupported task block type gets generated: {task_type}", ) break - else: - LOG.info("Unsupported task type", task_type=task_type) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason=f"Unsupported task block type gets generated: {task_type}", - ) - break # generate the extraction task block_result = await block.execute_safe( @@ -565,6 +585,11 @@ async def run_observer_task_helper( if block_result.success is True: completion_screenshots = [] try: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + url=url, + browser_session_id=browser_session_id, + ) scraped_page = await scrape_website( browser_state, url, @@ -595,7 +620,7 @@ async def run_observer_task_helper( prompt=observer_completion_prompt, screenshots=completion_screenshots, observer_thought=observer_thought, - prompt_name="observer-check-completion", + prompt_name="observer_check_completion", ) LOG.info( "Observer completion check response", @@ -900,7 +925,7 @@ async def _generate_loop_task( task_in_loop_metadata_prompt, screenshots=scraped_page.screenshots, observer_thought=observer_thought_task_in_loop, - prompt_name="observer-generate-task-block", + prompt_name="observer_generate_task_block", ) LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response) navigation_goal = task_in_loop_metadata_response.get("navigation_goal") @@ -996,7 +1021,7 @@ async def _generate_extraction_task( generate_extraction_task_response = await app.LLM_API_HANDLER( generate_extraction_task_prompt, observer_cruise=observer_cruise, - prompt_name="observer-generate-extraction-task", + prompt_name="observer_generate_extraction_task", ) LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response) @@ -1067,6 +1092,34 @@ async def _generate_navigation_task( ) +async def _generate_goto_url_task( + workflow_id: str, + url: str, +) -> tuple[UrlBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]: + LOG.info("Generating goto url task", url=url) + # create OutputParameter for the data_extraction block + label = f"goto_url_{_generate_random_string()}" + + url_block_yaml = UrlBlockYAML( + label=label, + url=url, + ) + output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=url_block_yaml, + ) + # create UrlBlock + return ( + UrlBlock( + label=label, + url=url, + output_parameter=output_parameter, + ), + [url_block_yaml], + [], + ) + + def _generate_random_string(length: int = 5) -> str: # Use the current timestamp as the seed random.seed(os.urandom(16)) @@ -1127,13 +1180,14 @@ async def mark_observer_task_as_completed( output: dict[str, Any] | None = None, ) -> ObserverTask: observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id=observer_cruise_id, + observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.completed, - workflow_run_id=workflow_run_id, summary=summary, output=output, ) + if workflow_run_id: + await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) # Track observer cruise duration when completed duration_seconds = (datetime.now(UTC) - observer_task.created_at.replace(tzinfo=UTC)).total_seconds() @@ -1245,7 +1299,7 @@ async def _summarize_observer_task( prompt=observer_summary_prompt, screenshots=screenshots, observer_thought=observer_thought, - prompt_name="observer-generate-summary", + prompt_name="observer_summary", ) LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp)