From 204972e2253d31eddb71ed26a22903cd8258598a Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Sun, 2 Feb 2025 03:10:38 +0800 Subject: [PATCH] Add step / task / workflow run / observer metrics as logs (#1698) Co-authored-by: Suchintan --- skyvern/forge/agent.py | 48 ++- skyvern/forge/agent_functions.py | 6 +- .../forge/sdk/api/llm/api_handler_factory.py | 38 +- skyvern/forge/sdk/api/llm/models.py | 1 + skyvern/forge/sdk/routes/agent_protocol.py | 6 +- skyvern/forge/sdk/routes/totp.py | 2 +- .../forge/sdk/services/observer_service.py | 342 ++++++++---------- skyvern/forge/sdk/workflow/models/block.py | 8 +- skyvern/forge/sdk/workflow/service.py | 13 + skyvern/webeye/actions/caching.py | 2 +- skyvern/webeye/actions/handler.py | 37 +- 11 files changed, 284 insertions(+), 219 deletions(-) diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 04dbbb46..d497c571 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -4,7 +4,7 @@ import os import random import string from asyncio.exceptions import CancelledError -from datetime import datetime +from datetime import UTC, datetime from pathlib import Path from typing import Any, Tuple @@ -755,6 +755,7 @@ class ForgeAgent: self.async_operation_pool.run_operation(task.task_id, AgentPhase.llm) json_response = await app.LLM_API_HANDLER( prompt=extract_action_prompt, + prompt_name="extract-actions", step=step, screenshots=scraped_page.screenshots, ) @@ -1126,7 +1127,10 @@ class ForgeAgent: # this prompt is critical to our agent so let's use the primary LLM API handler verification_result = await app.LLM_API_HANDLER( - prompt=verification_prompt, step=step, screenshots=scraped_page_refreshed.screenshots + prompt=verification_prompt, + step=step, + screenshots=scraped_page_refreshed.screenshots, + prompt_name="check-user-goal", ) return CompleteVerifyResult.model_validate(verification_result) @@ -1411,8 +1415,10 @@ class ForgeAgent: elif task_type == TaskType.validation: template = "decisive-criterion-validate" elif task_type == TaskType.action: - prompt = prompt_engine.load_prompt("infer-action-type", navigation_goal=navigation_goal) - json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step) + prompt = prompt_engine.load_prompt( + "infer-action-type", navigation_goal=navigation_goal, prompt_name="infer-action-type" + ) + json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="infer-action-type") if json_response.get("error"): raise FailedToParseActionInstruction( reason=json_response.get("thought"), error_type=json_response.get("error") @@ -1914,6 +1920,18 @@ class ForgeAgent: diff=update_comparison, ) + # Track step duration when step is completed or failed + if status in [StepStatus.completed, StepStatus.failed]: + duration_seconds = (datetime.now(UTC) - step.created_at.replace(tzinfo=UTC)).total_seconds() + LOG.info( + "Step duration metrics", + task_id=step.task_id, + step_id=step.step_id, + duration_seconds=duration_seconds, + status=status, + organization_id=step.organization_id, + ) + await save_step_logs(step.step_id) return await app.DATABASE.update_step( @@ -1948,6 +1966,19 @@ class ForgeAgent: for key, value in updates.items() if getattr(task, key) != value } + + # Track task duration when task is completed, failed, or terminated + if status in [TaskStatus.completed, TaskStatus.failed, TaskStatus.terminated]: + duration_seconds = (datetime.now(UTC) - task.created_at.replace(tzinfo=UTC)).total_seconds() + LOG.info( + "Task duration metrics", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + duration_seconds=duration_seconds, + status=status, + organization_id=task.organization_id, + ) + await save_task_logs(task.task_id) LOG.info("Updating task in db", task_id=task.task_id, diff=update_comparison) return await app.DATABASE.update_task( @@ -2040,7 +2071,9 @@ class ForgeAgent: navigation_payload=task.navigation_payload, steps=steps_results, ) - json_response = await app.LLM_API_HANDLER(prompt=prompt, screenshots=screenshots, step=step) + json_response = await app.LLM_API_HANDLER( + prompt=prompt, screenshots=screenshots, step=step, prompt_name="summarize-max-steps-reason" + ) return json_response.get("reasoning", "") except Exception: LOG.warning("Failed to summary the failure reason", task_id=task.task_id, step_id=step.step_id) @@ -2198,6 +2231,7 @@ class ForgeAgent: prompt=extract_action_prompt, step=step, screenshots=scraped_page.screenshots, + prompt_name="extract-actions", ) return json_response @@ -2238,9 +2272,7 @@ class ForgeAgent: ) data_extraction_summary_resp = await app.SECONDARY_LLM_API_HANDLER( - prompt=prompt, - step=step, - screenshots=scraped_page.screenshots, + prompt=prompt, step=step, screenshots=scraped_page.screenshots, prompt_name="data-extraction-summary" ) return ExtractAction( reasoning=data_extraction_summary_resp.get("summary", "Extracting information from the page"), diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index 9fccd911..3c029f4c 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -188,7 +188,9 @@ async def _convert_svg_to_string( for retry in range(SVG_SHAPE_CONVERTION_ATTEMPTS): try: - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=svg_convert_prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=svg_convert_prompt, step=step, prompt_name="svg-convert" + ) svg_shape = json_response.get("shape", "") recognized = json_response.get("recognized", False) if not svg_shape or not recognized: @@ -316,7 +318,7 @@ async def _convert_css_shape_to_string( for retry in range(CSS_SHAPE_CONVERTION_ATTEMPTS): try: json_response = await app.SECONDARY_LLM_API_HANDLER( - prompt=prompt, screenshots=[screenshot], step=step + prompt=prompt, screenshots=[screenshot], step=step, prompt_name="css-shape-convert" ) css_shape = json_response.get("shape", "") recognized = json_response.get("recognized", False) diff --git a/skyvern/forge/sdk/api/llm/api_handler_factory.py b/skyvern/forge/sdk/api/llm/api_handler_factory.py index 1353c1bd..b9fc40b4 100644 --- a/skyvern/forge/sdk/api/llm/api_handler_factory.py +++ b/skyvern/forge/sdk/api/llm/api_handler_factory.py @@ -61,6 +61,7 @@ class LLMAPIHandlerFactory: async def llm_api_handler_with_router_and_fallback( prompt: str, + prompt_name: str, step: Step | None = None, observer_cruise: ObserverTask | None = None, observer_thought: ObserverThought | None = None, @@ -80,6 +81,8 @@ class LLMAPIHandlerFactory: Returns: The response from the LLM router. """ + start_time = time.time() + if parameters is None: parameters = LLMAPIHandlerFactory.get_api_parameters(llm_config) @@ -120,7 +123,6 @@ class LLMAPIHandlerFactory: ) try: response = await router.acompletion(model=main_model_group, messages=messages, **parameters) - LOG.info("LLM API call successful", llm_key=llm_key, model=llm_config.model_name) except litellm.exceptions.APIError as e: raise LLMProviderErrorRetryableTask(llm_key) from e except ValueError as e: @@ -195,6 +197,21 @@ class LLMAPIHandlerFactory: ai_suggestion=ai_suggestion, ) + # Track LLM API handler duration + duration_seconds = time.time() - start_time + LOG.info( + "LLM API handler duration metrics", + llm_key=llm_key, + model=main_model_group, + prompt_name=prompt_name, + duration_seconds=duration_seconds, + step_id=step.step_id if step else None, + observer_thought_id=observer_thought.observer_thought_id if observer_thought else None, + organization_id=step.organization_id + if step + else (observer_thought.organization_id if observer_thought else None), + ) + return parsed_response return llm_api_handler_with_router_and_fallback @@ -210,6 +227,7 @@ class LLMAPIHandlerFactory: async def llm_api_handler( prompt: str, + prompt_name: str, step: Step | None = None, observer_cruise: ObserverTask | None = None, observer_thought: ObserverThought | None = None, @@ -217,6 +235,7 @@ class LLMAPIHandlerFactory: screenshots: list[bytes] | None = None, parameters: dict[str, Any] | None = None, ) -> dict[str, Any]: + start_time = time.time() active_parameters = base_parameters or {} if parameters is None: parameters = LLMAPIHandlerFactory.get_api_parameters(llm_config) @@ -270,14 +289,12 @@ class LLMAPIHandlerFactory: # TODO (kerem): add a timeout to this call # TODO (kerem): add a retry mechanism to this call (acompletion_with_retries) # TODO (kerem): use litellm fallbacks? https://litellm.vercel.app/docs/tutorials/fallbacks#how-does-completion_with_fallbacks-work - LOG.info("Calling LLM API", llm_key=llm_key, model=llm_config.model_name) response = await litellm.acompletion( model=llm_config.model_name, messages=messages, timeout=settings.LLM_CONFIG_TIMEOUT, **active_parameters, ) - LOG.info("LLM API call successful", llm_key=llm_key, model=llm_config.model_name) except litellm.exceptions.APIError as e: raise LLMProviderErrorRetryableTask(llm_key) from e except CancelledError: @@ -350,6 +367,21 @@ class LLMAPIHandlerFactory: ai_suggestion=ai_suggestion, ) + # Track LLM API handler duration + duration_seconds = time.time() - start_time + LOG.info( + "LLM API handler duration metrics", + llm_key=llm_key, + prompt_name=prompt_name, + model=llm_config.model_name, + duration_seconds=duration_seconds, + step_id=step.step_id if step else None, + observer_thought_id=observer_thought.observer_thought_id if observer_thought else None, + organization_id=step.organization_id + if step + else (observer_thought.organization_id if observer_thought else None), + ) + return parsed_response return llm_api_handler diff --git a/skyvern/forge/sdk/api/llm/models.py b/skyvern/forge/sdk/api/llm/models.py index 59fbedcc..197999a5 100644 --- a/skyvern/forge/sdk/api/llm/models.py +++ b/skyvern/forge/sdk/api/llm/models.py @@ -79,6 +79,7 @@ class LLMAPIHandler(Protocol): def __call__( self, prompt: str, + prompt_name: str, step: Step | None = None, observer_cruise: ObserverTask | None = None, observer_thought: ObserverThought | None = None, diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 8204b266..d8073115 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1001,7 +1001,9 @@ async def make_ai_suggestion( ai_suggestion_type=ai_suggestion_type, ) - llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, ai_suggestion=new_ai_suggestion) + llm_response = await app.LLM_API_HANDLER( + prompt=llm_prompt, ai_suggestion=new_ai_suggestion, prompt_name="suggest-data-schema" + ) parsed_ai_suggestion = AISuggestionBase.model_validate(llm_response) return parsed_ai_suggestion @@ -1045,7 +1047,7 @@ async def generate_task( llm_prompt = prompt_engine.load_prompt("generate-task", user_prompt=data.prompt) try: - llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt) + llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="generate-task") parsed_task_generation_obj = TaskGenerationBase.model_validate(llm_response) # generate a TaskGenerationModel diff --git a/skyvern/forge/sdk/routes/totp.py b/skyvern/forge/sdk/routes/totp.py index 5849bb9e..b1ef4d25 100644 --- a/skyvern/forge/sdk/routes/totp.py +++ b/skyvern/forge/sdk/routes/totp.py @@ -41,5 +41,5 @@ async def save_totp_code( async def parse_totp_code(content: str) -> str | None: prompt = prompt_engine.load_prompt("parse-verification-code", content=content) - code_resp = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt) + code_resp = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, prompt_name="parse-verification-code") return code_resp.get("code", None) diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py index e8e0bd07..3a16118d 100644 --- a/skyvern/forge/sdk/services/observer_service.py +++ b/skyvern/forge/sdk/services/observer_service.py @@ -1,7 +1,7 @@ import os import random import string -from datetime import datetime +from datetime import UTC, datetime from typing import Any import httpx @@ -34,7 +34,6 @@ from skyvern.forge.sdk.workflow.models.block import ( ForLoopBlock, NavigationBlock, TaskBlock, - UrlBlock, ) from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter from skyvern.forge.sdk.workflow.models.workflow import ( @@ -52,7 +51,6 @@ from skyvern.forge.sdk.workflow.models.yaml import ( ForLoopBlockYAML, NavigationBlockYAML, TaskBlockYAML, - UrlBlockYAML, WorkflowCreateYAMLRequest, WorkflowDefinitionYAML, ) @@ -119,7 +117,9 @@ async def initialize_observer_task( ) metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url) - metadata_response = await app.LLM_API_HANDLER(prompt=metadata_prompt, observer_thought=observer_thought) + metadata_response = await app.LLM_API_HANDLER( + prompt=metadata_prompt, observer_thought=observer_thought, prompt_name="observer-generate-metadata" + ) # validate LOG.info(f"Initialized observer initial response: {metadata_response}") url: str = user_url or metadata_response.get("url", "") @@ -350,175 +350,160 @@ async def run_observer_task_helper( max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS for i in range(max_iterations): LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) - task_type = "" - plan = "" - block: BlockTypeVar | None = None - task_history_record: dict[str, Any] = {} - context = skyvern_context.ensure_context() - - if i == 0: - # The first iteration is always a GOTO_URL task - task_type = "goto_url" - plan = f"Go to this website: {url}" - task_history_record = {"type": task_type, "task": plan} - block, block_yaml_list, parameter_yaml_list = await _generate_goto_url_task( - workflow_id=workflow_id, + try: + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, url=url, + browser_session_id=browser_session_id, ) - task_history_record = {"type": task_type, "task": plan} - else: - try: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, - url=url, - browser_session_id=browser_session_id, - ) - scraped_page = await scrape_website( - browser_state, - url, - app.AGENT_FUNCTION.cleanup_element_tree_factory(), - scrape_exclude=app.scrape_exclude, - ) - element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) - page = await browser_state.get_working_page() - except Exception: - LOG.exception( - "Failed to get browser state or scrape website in observer iteration", iteration=i, url=url - ) - continue - current_url = str( - await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url + scraped_page = await scrape_website( + browser_state, + url, + app.AGENT_FUNCTION.cleanup_element_tree_factory(), + scrape_exclude=app.scrape_exclude, ) + element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) + page = await browser_state.get_working_page() + except Exception: + LOG.exception("Failed to get browser state or scrape website in observer iteration", iteration=i, url=url) + continue + current_url = str( + await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url + ) - observer_prompt = prompt_engine.load_prompt( - "observer", - current_url=current_url, - elements=element_tree_in_prompt, - user_goal=user_prompt, - task_history=task_history, - local_datetime=datetime.now(context.tz_info).isoformat(), - ) - observer_thought = await app.DATABASE.create_observer_thought( - observer_cruise_id=observer_cruise_id, - organization_id=organization_id, - workflow_run_id=workflow_run.workflow_run_id, - workflow_id=workflow.workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - observer_thought_type=ObserverThoughtType.plan, - observer_thought_scenario=ObserverThoughtScenario.generate_plan, - ) - observer_response = await app.LLM_API_HANDLER( - prompt=observer_prompt, - screenshots=scraped_page.screenshots, - observer_thought=observer_thought, - ) + context = skyvern_context.ensure_context() + observer_prompt = prompt_engine.load_prompt( + "observer", + current_url=current_url, + elements=element_tree_in_prompt, + user_goal=user_prompt, + task_history=task_history, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + observer_thought = await app.DATABASE.create_observer_thought( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + observer_thought_type=ObserverThoughtType.plan, + observer_thought_scenario=ObserverThoughtScenario.generate_plan, + ) + observer_response = await app.LLM_API_HANDLER( + prompt=observer_prompt, + screenshots=scraped_page.screenshots, + observer_thought=observer_thought, + prompt_name="observer-generate-plan", + ) + LOG.info( + "Observer response", + observer_response=observer_response, + iteration=i, + current_url=current_url, + workflow_run_id=workflow_run_id, + ) + # see if the user goal has achieved or not + user_goal_achieved = observer_response.get("user_goal_achieved", False) + observation = observer_response.get("page_info", "") + thoughts: str = observer_response.get("thoughts", "") + plan: str = observer_response.get("plan", "") + task_type: str = observer_response.get("task_type", "") + # Create and save observer thought + await app.DATABASE.update_observer_thought( + observer_thought_id=observer_thought.observer_thought_id, + organization_id=organization_id, + thought=thoughts, + observation=observation, + answer=plan, + output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, + ) + + if user_goal_achieved is True: LOG.info( - "Observer response", - observer_response=observer_response, + "User goal achieved. Workflow run will complete. Observer is stopping", iteration=i, - current_url=current_url, workflow_run_id=workflow_run_id, ) - # see if the user goal has achieved or not - user_goal_achieved = observer_response.get("user_goal_achieved", False) - observation = observer_response.get("page_info", "") - thoughts: str = observer_response.get("thoughts", "") - plan = observer_response.get("plan", "") - task_type = observer_response.get("task_type", "") - # Create and save observer thought - await app.DATABASE.update_observer_thought( - observer_thought_id=observer_thought.observer_thought_id, - organization_id=organization_id, - thought=thoughts, - observation=observation, - answer=plan, - output={"task_type": task_type, "user_goal_achieved": user_goal_achieved}, + observer_task = await _summarize_observer_task( + observer_task=observer_task, + task_history=task_history, + context=context, + screenshots=scraped_page.screenshots, ) + break - if user_goal_achieved is True: - LOG.info( - "User goal achieved. Workflow run will complete. Observer is stopping", - iteration=i, - workflow_run_id=workflow_run_id, - ) - observer_task = await _summarize_observer_task( - observer_task=observer_task, - task_history=task_history, - context=context, - screenshots=scraped_page.screenshots, - ) - break + if not plan: + LOG.warning("No plan found in observer response", observer_response=observer_response) + continue - if not plan: - LOG.warning("No plan found in observer response", observer_response=observer_response) - continue + # parse observer repsonse and run the next task + if not task_type: + LOG.error("No task type found in observer response", observer_response=observer_response) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Skyvern failed to generate a task. Please try again later.", + ) + break - # parse observer repsonse and run the next task - if not task_type: - LOG.error("No task type found in observer response", observer_response=observer_response) - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason="Skyvern failed to generate a task. Please try again later.", - ) - break - - if task_type == "extract": - block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( + block: BlockTypeVar | None = None + task_history_record: dict[str, Any] = {} + if task_type == "extract": + block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( + observer_cruise=observer_task, + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + current_url=current_url, + element_tree_in_prompt=element_tree_in_prompt, + data_extraction_goal=plan, + task_history=task_history, + ) + task_history_record = {"type": task_type, "task": plan} + elif task_type == "navigate": + original_url = url if i == 0 else None + navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) + block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( + workflow_id=workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run_id, + original_url=original_url, + navigation_goal=navigation_goal, + totp_verification_url=observer_task.totp_verification_url, + totp_identifier=observer_task.totp_identifier, + ) + task_history_record = {"type": task_type, "task": plan} + elif task_type == "loop": + try: + block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( observer_cruise=observer_task, workflow_id=workflow_id, workflow_permanent_id=workflow.workflow_permanent_id, workflow_run_id=workflow_run_id, - current_url=current_url, - element_tree_in_prompt=element_tree_in_prompt, - data_extraction_goal=plan, - task_history=task_history, + plan=plan, + browser_state=browser_state, + original_url=url, + scraped_page=scraped_page, ) - task_history_record = {"type": task_type, "task": plan} - elif task_type == "navigate": - original_url = url if i == 0 else None - navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan) - block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( - workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - original_url=original_url, - navigation_goal=navigation_goal, - totp_verification_url=observer_task.totp_verification_url, - totp_identifier=observer_task.totp_identifier, - ) - task_history_record = {"type": task_type, "task": plan} - elif task_type == "loop": - try: - block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( - observer_cruise=observer_task, - workflow_id=workflow_id, - workflow_permanent_id=workflow.workflow_permanent_id, - workflow_run_id=workflow_run_id, - plan=plan, - browser_state=browser_state, - original_url=url, - scraped_page=scraped_page, - ) - task_history_record = { - "type": task_type, - "task": plan, - "loop_over_values": extraction_obj.get("loop_values"), - "task_inside_the_loop": inner_task, - } - except Exception: - LOG.exception("Failed to generate loop task") - await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( - workflow_run_id=workflow_run_id, - failure_reason="Failed to generate the loop.", - ) - break - else: - LOG.info("Unsupported task type", task_type=task_type) + task_history_record = { + "type": task_type, + "task": plan, + "loop_over_values": extraction_obj.get("loop_values"), + "task_inside_the_loop": inner_task, + } + except Exception: + LOG.exception("Failed to generate loop task") await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( workflow_run_id=workflow_run_id, - failure_reason=f"Unsupported task block type gets generated: {task_type}", + failure_reason="Failed to generate the loop.", ) break + else: + LOG.info("Unsupported task type", task_type=task_type) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason=f"Unsupported task block type gets generated: {task_type}", + ) + break # generate the extraction task block_result = await block.execute_safe( @@ -580,11 +565,6 @@ async def run_observer_task_helper( if block_result.success is True: completion_screenshots = [] try: - browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( - workflow_run=workflow_run, - url=url, - browser_session_id=browser_session_id, - ) scraped_page = await scrape_website( browser_state, url, @@ -615,6 +595,7 @@ async def run_observer_task_helper( prompt=observer_completion_prompt, screenshots=completion_screenshots, observer_thought=observer_thought, + prompt_name="observer-check-completion", ) LOG.info( "Observer completion check response", @@ -917,6 +898,7 @@ async def _generate_loop_task( task_in_loop_metadata_prompt, screenshots=scraped_page.screenshots, observer_thought=observer_thought_task_in_loop, + prompt_name="observer-generate-task-block", ) LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response) navigation_goal = task_in_loop_metadata_response.get("navigation_goal") @@ -1012,6 +994,7 @@ async def _generate_extraction_task( generate_extraction_task_response = await app.LLM_API_HANDLER( generate_extraction_task_prompt, observer_cruise=observer_cruise, + prompt_name="observer-generate-extraction-task", ) LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response) @@ -1082,34 +1065,6 @@ async def _generate_navigation_task( ) -async def _generate_goto_url_task( - workflow_id: str, - url: str, -) -> tuple[UrlBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]: - LOG.info("Generating goto url task", url=url) - # create OutputParameter for the data_extraction block - label = f"goto_url_{_generate_random_string()}" - - url_block_yaml = UrlBlockYAML( - label=label, - url=url, - ) - output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( - workflow_id=workflow_id, - block_yaml=url_block_yaml, - ) - # create UrlBlock - return ( - UrlBlock( - label=label, - url=url, - output_parameter=output_parameter, - ), - [url_block_yaml], - [], - ) - - def _generate_random_string(length: int = 5) -> str: # Use the current timestamp as the seed random.seed(os.urandom(16)) @@ -1170,14 +1125,24 @@ async def mark_observer_task_as_completed( output: dict[str, Any] | None = None, ) -> ObserverTask: observer_task = await app.DATABASE.update_observer_cruise( - observer_cruise_id, + observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverTaskStatus.completed, + workflow_run_id=workflow_run_id, summary=summary, output=output, ) - if workflow_run_id: - await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id) + + # Track observer cruise duration when completed + duration_seconds = (datetime.now(UTC) - observer_task.created_at).total_seconds() + LOG.info( + "Observer cruise duration metrics", + observer_cruise_id=observer_cruise_id, + workflow_run_id=workflow_run_id, + duration_seconds=duration_seconds, + status=ObserverTaskStatus.completed, + organization_id=organization_id, + ) await send_observer_task_webhook(observer_task) return observer_task @@ -1278,6 +1243,7 @@ async def _summarize_observer_task( prompt=observer_summary_prompt, screenshots=screenshots, observer_thought=observer_thought, + prompt_name="observer-generate-summary", ) LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index e7d696ff..6dc99bce 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -246,7 +246,9 @@ class Block(BaseModel, abc.ABC): "generate_workflow_run_block_description", block=block_data, ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=description_generation_prompt) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=description_generation_prompt, prompt_name="generate-workflow-run-block-description" + ) description = json_response.get("summary") LOG.info( "Generated description for the workflow run block", @@ -1198,7 +1200,7 @@ class TextPromptBlock(Block): prompt=prompt, llm_key=self.llm_key, ) - response = await llm_api_handler(prompt=prompt) + response = await llm_api_handler(prompt=prompt, prompt_name="text-prompt") LOG.info("TextPromptBlock: Received response from LLM", response=response) return response @@ -1954,7 +1956,7 @@ class PDFParserBlock(Block): llm_prompt = prompt_engine.load_prompt( "extract-information-from-file-text", extracted_text_content=extracted_text, json_schema=self.json_schema ) - llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt) + llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="extract-information-from-file-text") # Record the parsed data await self.record_output_parameter_value(workflow_run_context, workflow_run_id, llm_response) return await self.build_block_result( diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index cd722119..1493814c 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1,5 +1,6 @@ import asyncio import json +from datetime import UTC, datetime from typing import Any import httpx @@ -468,6 +469,18 @@ class WorkflowService: browser_session_id=browser_session_id, close_browser_on_completion=browser_session_id is None, ) + + # Track workflow run duration when completed + duration_seconds = (datetime.now(UTC) - workflow_run.created_at).total_seconds() + LOG.info( + "Workflow run duration metrics", + workflow_run_id=workflow_run_id, + workflow_id=workflow_run.workflow_id, + duration_seconds=duration_seconds, + status=WorkflowRunStatus.completed, + organization_id=organization_id, + ) + return workflow_run async def create_workflow( diff --git a/skyvern/webeye/actions/caching.py b/skyvern/webeye/actions/caching.py index a35f57e1..e712873f 100644 --- a/skyvern/webeye/actions/caching.py +++ b/skyvern/webeye/actions/caching.py @@ -189,7 +189,7 @@ async def get_user_detail_answers( ) llm_response = await app.SECONDARY_LLM_API_HANDLER( - prompt=question_answering_prompt, step=step, screenshots=None + prompt=question_answering_prompt, step=step, screenshots=None, prompt_name="answer-user-detail-questions" ) return llm_response except Exception as e: diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index c6356241..3b3dd46f 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -758,7 +758,9 @@ async def handle_input_text_action( elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML), ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, step=step, prompt_name="parse-input-or-select-context" + ) input_or_select_context = InputOrSelectContext.model_validate(json_response) LOG.info( "Parsed input/select context", @@ -1675,7 +1677,9 @@ async def choose_auto_completion_dropdown( step_id=step.step_id, task_id=task.task_id, ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=auto_completion_confirm_prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=auto_completion_confirm_prompt, step=step, prompt_name="auto-completion-choose-option" + ) element_id = json_response.get("id", "") relevance_float = json_response.get("relevance_float", 0) if json_response.get("direct_searching", False): @@ -1827,7 +1831,9 @@ async def input_or_auto_complete_input( task_id=task.task_id, potential_value_count=AUTO_COMPLETION_POTENTIAL_VALUES_COUNT, ) - json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_respone = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, step=step, prompt_name="auto-completion-potential-answers" + ) values: list[dict] = json_respone.get("potential_values", []) for each_value in values: @@ -1880,7 +1886,9 @@ async def input_or_auto_complete_input( tried_values=json.dumps(tried_values), popped_up_elements="".join([json_to_html(element) for element in cleaned_new_elements]), ) - json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_respone = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, step=step, prompt_name="auto-completion-tweak-value" + ) context_reasoning = json_respone.get("reasoning") new_current_value = json_respone.get("tweaked_value", "") if not new_current_value: @@ -1929,7 +1937,9 @@ async def sequentially_select_from_dropdown( element_id=action.element_id, elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML), ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, step=step, prompt_name="parse-input-or-select-context" + ) input_or_select_context = InputOrSelectContext.model_validate(json_response) LOG.info( "Parsed input/select context", @@ -2039,7 +2049,9 @@ async def sequentially_select_from_dropdown( select_history=json.dumps(build_sequential_select_history(select_history)), local_datetime=datetime.now(ensure_context().tz_info).isoformat(), ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, screenshots=[screenshot], step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, screenshots=[screenshot], step=step, prompt_name="confirm-multi-selection-finish" + ) if json_response.get("is_finished", False): return single_select_result.action_result, values[-1] if len(values) > 0 else None @@ -2144,9 +2156,9 @@ async def select_from_dropdown( if context.is_date_related: # HACK: according to the test, secondary LLM is not doing well on the date picker # using the main LLM to handle the case - json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select") else: - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select") value: str | None = json_response.get("value", None) single_select_result.value = value select_reason: str | None = json_response.get("reasoning", None) @@ -2426,7 +2438,7 @@ async def locate_dropdown_menu( element=element_dict, ) json_response = await app.SECONDARY_LLM_API_HANDLER( - prompt=dropdown_confirm_prompt, screenshots=[screenshot], step=step + prompt=dropdown_confirm_prompt, screenshots=[screenshot], step=step, prompt_name="opened-dropdown-confirm" ) is_opened_dropdown_menu = json_response.get("is_opened_dropdown_menu") if is_opened_dropdown_menu: @@ -2573,7 +2585,9 @@ async def normal_select( element_id=action.element_id, elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML), ) - json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.SECONDARY_LLM_API_HANDLER( + prompt=prompt, step=step, prompt_name="parse-input-or-select-context" + ) input_or_select_context = InputOrSelectContext.model_validate(json_response) LOG.info( "Parsed input/select context", @@ -2593,7 +2607,7 @@ async def normal_select( options=options_html, ) - json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step) + json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select") index: int | None = json_response.get("index") value: str | None = json_response.get("value") @@ -2756,6 +2770,7 @@ async def extract_information_for_navigation_goal( prompt=extract_information_prompt, step=step, screenshots=scraped_page.screenshots, + prompt_name="extract-information", ) return ScrapeResult(