Add step / task / workflow run / observer metrics as logs (#1698)

Co-authored-by: Suchintan <suchintan@users.noreply.github.com>
This commit is contained in:
Shuchang Zheng
2025-02-02 03:10:38 +08:00
committed by GitHub
parent 41e8d8b0ac
commit 204972e225
11 changed files with 284 additions and 219 deletions

View File

@@ -4,7 +4,7 @@ import os
import random
import string
from asyncio.exceptions import CancelledError
from datetime import datetime
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Tuple
@@ -755,6 +755,7 @@ class ForgeAgent:
self.async_operation_pool.run_operation(task.task_id, AgentPhase.llm)
json_response = await app.LLM_API_HANDLER(
prompt=extract_action_prompt,
prompt_name="extract-actions",
step=step,
screenshots=scraped_page.screenshots,
)
@@ -1126,7 +1127,10 @@ class ForgeAgent:
# this prompt is critical to our agent so let's use the primary LLM API handler
verification_result = await app.LLM_API_HANDLER(
prompt=verification_prompt, step=step, screenshots=scraped_page_refreshed.screenshots
prompt=verification_prompt,
step=step,
screenshots=scraped_page_refreshed.screenshots,
prompt_name="check-user-goal",
)
return CompleteVerifyResult.model_validate(verification_result)
@@ -1411,8 +1415,10 @@ class ForgeAgent:
elif task_type == TaskType.validation:
template = "decisive-criterion-validate"
elif task_type == TaskType.action:
prompt = prompt_engine.load_prompt("infer-action-type", navigation_goal=navigation_goal)
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step)
prompt = prompt_engine.load_prompt(
"infer-action-type", navigation_goal=navigation_goal, prompt_name="infer-action-type"
)
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="infer-action-type")
if json_response.get("error"):
raise FailedToParseActionInstruction(
reason=json_response.get("thought"), error_type=json_response.get("error")
@@ -1914,6 +1920,18 @@ class ForgeAgent:
diff=update_comparison,
)
# Track step duration when step is completed or failed
if status in [StepStatus.completed, StepStatus.failed]:
duration_seconds = (datetime.now(UTC) - step.created_at.replace(tzinfo=UTC)).total_seconds()
LOG.info(
"Step duration metrics",
task_id=step.task_id,
step_id=step.step_id,
duration_seconds=duration_seconds,
status=status,
organization_id=step.organization_id,
)
await save_step_logs(step.step_id)
return await app.DATABASE.update_step(
@@ -1948,6 +1966,19 @@ class ForgeAgent:
for key, value in updates.items()
if getattr(task, key) != value
}
# Track task duration when task is completed, failed, or terminated
if status in [TaskStatus.completed, TaskStatus.failed, TaskStatus.terminated]:
duration_seconds = (datetime.now(UTC) - task.created_at.replace(tzinfo=UTC)).total_seconds()
LOG.info(
"Task duration metrics",
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
duration_seconds=duration_seconds,
status=status,
organization_id=task.organization_id,
)
await save_task_logs(task.task_id)
LOG.info("Updating task in db", task_id=task.task_id, diff=update_comparison)
return await app.DATABASE.update_task(
@@ -2040,7 +2071,9 @@ class ForgeAgent:
navigation_payload=task.navigation_payload,
steps=steps_results,
)
json_response = await app.LLM_API_HANDLER(prompt=prompt, screenshots=screenshots, step=step)
json_response = await app.LLM_API_HANDLER(
prompt=prompt, screenshots=screenshots, step=step, prompt_name="summarize-max-steps-reason"
)
return json_response.get("reasoning", "")
except Exception:
LOG.warning("Failed to summary the failure reason", task_id=task.task_id, step_id=step.step_id)
@@ -2198,6 +2231,7 @@ class ForgeAgent:
prompt=extract_action_prompt,
step=step,
screenshots=scraped_page.screenshots,
prompt_name="extract-actions",
)
return json_response
@@ -2238,9 +2272,7 @@ class ForgeAgent:
)
data_extraction_summary_resp = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt,
step=step,
screenshots=scraped_page.screenshots,
prompt=prompt, step=step, screenshots=scraped_page.screenshots, prompt_name="data-extraction-summary"
)
return ExtractAction(
reasoning=data_extraction_summary_resp.get("summary", "Extracting information from the page"),

View File

@@ -188,7 +188,9 @@ async def _convert_svg_to_string(
for retry in range(SVG_SHAPE_CONVERTION_ATTEMPTS):
try:
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=svg_convert_prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=svg_convert_prompt, step=step, prompt_name="svg-convert"
)
svg_shape = json_response.get("shape", "")
recognized = json_response.get("recognized", False)
if not svg_shape or not recognized:
@@ -316,7 +318,7 @@ async def _convert_css_shape_to_string(
for retry in range(CSS_SHAPE_CONVERTION_ATTEMPTS):
try:
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, screenshots=[screenshot], step=step
prompt=prompt, screenshots=[screenshot], step=step, prompt_name="css-shape-convert"
)
css_shape = json_response.get("shape", "")
recognized = json_response.get("recognized", False)

View File

@@ -61,6 +61,7 @@ class LLMAPIHandlerFactory:
async def llm_api_handler_with_router_and_fallback(
prompt: str,
prompt_name: str,
step: Step | None = None,
observer_cruise: ObserverTask | None = None,
observer_thought: ObserverThought | None = None,
@@ -80,6 +81,8 @@ class LLMAPIHandlerFactory:
Returns:
The response from the LLM router.
"""
start_time = time.time()
if parameters is None:
parameters = LLMAPIHandlerFactory.get_api_parameters(llm_config)
@@ -120,7 +123,6 @@ class LLMAPIHandlerFactory:
)
try:
response = await router.acompletion(model=main_model_group, messages=messages, **parameters)
LOG.info("LLM API call successful", llm_key=llm_key, model=llm_config.model_name)
except litellm.exceptions.APIError as e:
raise LLMProviderErrorRetryableTask(llm_key) from e
except ValueError as e:
@@ -195,6 +197,21 @@ class LLMAPIHandlerFactory:
ai_suggestion=ai_suggestion,
)
# Track LLM API handler duration
duration_seconds = time.time() - start_time
LOG.info(
"LLM API handler duration metrics",
llm_key=llm_key,
model=main_model_group,
prompt_name=prompt_name,
duration_seconds=duration_seconds,
step_id=step.step_id if step else None,
observer_thought_id=observer_thought.observer_thought_id if observer_thought else None,
organization_id=step.organization_id
if step
else (observer_thought.organization_id if observer_thought else None),
)
return parsed_response
return llm_api_handler_with_router_and_fallback
@@ -210,6 +227,7 @@ class LLMAPIHandlerFactory:
async def llm_api_handler(
prompt: str,
prompt_name: str,
step: Step | None = None,
observer_cruise: ObserverTask | None = None,
observer_thought: ObserverThought | None = None,
@@ -217,6 +235,7 @@ class LLMAPIHandlerFactory:
screenshots: list[bytes] | None = None,
parameters: dict[str, Any] | None = None,
) -> dict[str, Any]:
start_time = time.time()
active_parameters = base_parameters or {}
if parameters is None:
parameters = LLMAPIHandlerFactory.get_api_parameters(llm_config)
@@ -270,14 +289,12 @@ class LLMAPIHandlerFactory:
# TODO (kerem): add a timeout to this call
# TODO (kerem): add a retry mechanism to this call (acompletion_with_retries)
# TODO (kerem): use litellm fallbacks? https://litellm.vercel.app/docs/tutorials/fallbacks#how-does-completion_with_fallbacks-work
LOG.info("Calling LLM API", llm_key=llm_key, model=llm_config.model_name)
response = await litellm.acompletion(
model=llm_config.model_name,
messages=messages,
timeout=settings.LLM_CONFIG_TIMEOUT,
**active_parameters,
)
LOG.info("LLM API call successful", llm_key=llm_key, model=llm_config.model_name)
except litellm.exceptions.APIError as e:
raise LLMProviderErrorRetryableTask(llm_key) from e
except CancelledError:
@@ -350,6 +367,21 @@ class LLMAPIHandlerFactory:
ai_suggestion=ai_suggestion,
)
# Track LLM API handler duration
duration_seconds = time.time() - start_time
LOG.info(
"LLM API handler duration metrics",
llm_key=llm_key,
prompt_name=prompt_name,
model=llm_config.model_name,
duration_seconds=duration_seconds,
step_id=step.step_id if step else None,
observer_thought_id=observer_thought.observer_thought_id if observer_thought else None,
organization_id=step.organization_id
if step
else (observer_thought.organization_id if observer_thought else None),
)
return parsed_response
return llm_api_handler

View File

@@ -79,6 +79,7 @@ class LLMAPIHandler(Protocol):
def __call__(
self,
prompt: str,
prompt_name: str,
step: Step | None = None,
observer_cruise: ObserverTask | None = None,
observer_thought: ObserverThought | None = None,

View File

@@ -1001,7 +1001,9 @@ async def make_ai_suggestion(
ai_suggestion_type=ai_suggestion_type,
)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, ai_suggestion=new_ai_suggestion)
llm_response = await app.LLM_API_HANDLER(
prompt=llm_prompt, ai_suggestion=new_ai_suggestion, prompt_name="suggest-data-schema"
)
parsed_ai_suggestion = AISuggestionBase.model_validate(llm_response)
return parsed_ai_suggestion
@@ -1045,7 +1047,7 @@ async def generate_task(
llm_prompt = prompt_engine.load_prompt("generate-task", user_prompt=data.prompt)
try:
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="generate-task")
parsed_task_generation_obj = TaskGenerationBase.model_validate(llm_response)
# generate a TaskGenerationModel

View File

@@ -41,5 +41,5 @@ async def save_totp_code(
async def parse_totp_code(content: str) -> str | None:
prompt = prompt_engine.load_prompt("parse-verification-code", content=content)
code_resp = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt)
code_resp = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, prompt_name="parse-verification-code")
return code_resp.get("code", None)

View File

@@ -1,7 +1,7 @@
import os
import random
import string
from datetime import datetime
from datetime import UTC, datetime
from typing import Any
import httpx
@@ -34,7 +34,6 @@ from skyvern.forge.sdk.workflow.models.block import (
ForLoopBlock,
NavigationBlock,
TaskBlock,
UrlBlock,
)
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter
from skyvern.forge.sdk.workflow.models.workflow import (
@@ -52,7 +51,6 @@ from skyvern.forge.sdk.workflow.models.yaml import (
ForLoopBlockYAML,
NavigationBlockYAML,
TaskBlockYAML,
UrlBlockYAML,
WorkflowCreateYAMLRequest,
WorkflowDefinitionYAML,
)
@@ -119,7 +117,9 @@ async def initialize_observer_task(
)
metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url)
metadata_response = await app.LLM_API_HANDLER(prompt=metadata_prompt, observer_thought=observer_thought)
metadata_response = await app.LLM_API_HANDLER(
prompt=metadata_prompt, observer_thought=observer_thought, prompt_name="observer-generate-metadata"
)
# validate
LOG.info(f"Initialized observer initial response: {metadata_response}")
url: str = user_url or metadata_response.get("url", "")
@@ -350,175 +350,160 @@ async def run_observer_task_helper(
max_iterations = int_max_iterations_override or DEFAULT_MAX_ITERATIONS
for i in range(max_iterations):
LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url)
task_type = ""
plan = ""
block: BlockTypeVar | None = None
task_history_record: dict[str, Any] = {}
context = skyvern_context.ensure_context()
if i == 0:
# The first iteration is always a GOTO_URL task
task_type = "goto_url"
plan = f"Go to this website: {url}"
task_history_record = {"type": task_type, "task": plan}
block, block_yaml_list, parameter_yaml_list = await _generate_goto_url_task(
workflow_id=workflow_id,
try:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run,
url=url,
browser_session_id=browser_session_id,
)
task_history_record = {"type": task_type, "task": plan}
else:
try:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run,
url=url,
browser_session_id=browser_session_id,
)
scraped_page = await scrape_website(
browser_state,
url,
app.AGENT_FUNCTION.cleanup_element_tree_factory(),
scrape_exclude=app.scrape_exclude,
)
element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML)
page = await browser_state.get_working_page()
except Exception:
LOG.exception(
"Failed to get browser state or scrape website in observer iteration", iteration=i, url=url
)
continue
current_url = str(
await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url
scraped_page = await scrape_website(
browser_state,
url,
app.AGENT_FUNCTION.cleanup_element_tree_factory(),
scrape_exclude=app.scrape_exclude,
)
element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML)
page = await browser_state.get_working_page()
except Exception:
LOG.exception("Failed to get browser state or scrape website in observer iteration", iteration=i, url=url)
continue
current_url = str(
await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url
)
observer_prompt = prompt_engine.load_prompt(
"observer",
current_url=current_url,
elements=element_tree_in_prompt,
user_goal=user_prompt,
task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=workflow.workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.plan,
observer_thought_scenario=ObserverThoughtScenario.generate_plan,
)
observer_response = await app.LLM_API_HANDLER(
prompt=observer_prompt,
screenshots=scraped_page.screenshots,
observer_thought=observer_thought,
)
context = skyvern_context.ensure_context()
observer_prompt = prompt_engine.load_prompt(
"observer",
current_url=current_url,
elements=element_tree_in_prompt,
user_goal=user_prompt,
task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
workflow_run_id=workflow_run.workflow_run_id,
workflow_id=workflow.workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.plan,
observer_thought_scenario=ObserverThoughtScenario.generate_plan,
)
observer_response = await app.LLM_API_HANDLER(
prompt=observer_prompt,
screenshots=scraped_page.screenshots,
observer_thought=observer_thought,
prompt_name="observer-generate-plan",
)
LOG.info(
"Observer response",
observer_response=observer_response,
iteration=i,
current_url=current_url,
workflow_run_id=workflow_run_id,
)
# see if the user goal has achieved or not
user_goal_achieved = observer_response.get("user_goal_achieved", False)
observation = observer_response.get("page_info", "")
thoughts: str = observer_response.get("thoughts", "")
plan: str = observer_response.get("plan", "")
task_type: str = observer_response.get("task_type", "")
# Create and save observer thought
await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id,
organization_id=organization_id,
thought=thoughts,
observation=observation,
answer=plan,
output={"task_type": task_type, "user_goal_achieved": user_goal_achieved},
)
if user_goal_achieved is True:
LOG.info(
"Observer response",
observer_response=observer_response,
"User goal achieved. Workflow run will complete. Observer is stopping",
iteration=i,
current_url=current_url,
workflow_run_id=workflow_run_id,
)
# see if the user goal has achieved or not
user_goal_achieved = observer_response.get("user_goal_achieved", False)
observation = observer_response.get("page_info", "")
thoughts: str = observer_response.get("thoughts", "")
plan = observer_response.get("plan", "")
task_type = observer_response.get("task_type", "")
# Create and save observer thought
await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id,
organization_id=organization_id,
thought=thoughts,
observation=observation,
answer=plan,
output={"task_type": task_type, "user_goal_achieved": user_goal_achieved},
observer_task = await _summarize_observer_task(
observer_task=observer_task,
task_history=task_history,
context=context,
screenshots=scraped_page.screenshots,
)
break
if user_goal_achieved is True:
LOG.info(
"User goal achieved. Workflow run will complete. Observer is stopping",
iteration=i,
workflow_run_id=workflow_run_id,
)
observer_task = await _summarize_observer_task(
observer_task=observer_task,
task_history=task_history,
context=context,
screenshots=scraped_page.screenshots,
)
break
if not plan:
LOG.warning("No plan found in observer response", observer_response=observer_response)
continue
if not plan:
LOG.warning("No plan found in observer response", observer_response=observer_response)
continue
# parse observer repsonse and run the next task
if not task_type:
LOG.error("No task type found in observer response", observer_response=observer_response)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id,
failure_reason="Skyvern failed to generate a task. Please try again later.",
)
break
# parse observer repsonse and run the next task
if not task_type:
LOG.error("No task type found in observer response", observer_response=observer_response)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id,
failure_reason="Skyvern failed to generate a task. Please try again later.",
)
break
if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
block: BlockTypeVar | None = None
task_history_record: dict[str, Any] = {}
if task_type == "extract":
block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task(
observer_cruise=observer_task,
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
current_url=current_url,
element_tree_in_prompt=element_tree_in_prompt,
data_extraction_goal=plan,
task_history=task_history,
)
task_history_record = {"type": task_type, "task": plan}
elif task_type == "navigate":
original_url = url if i == 0 else None
navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan)
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
original_url=original_url,
navigation_goal=navigation_goal,
totp_verification_url=observer_task.totp_verification_url,
totp_identifier=observer_task.totp_identifier,
)
task_history_record = {"type": task_type, "task": plan}
elif task_type == "loop":
try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
observer_cruise=observer_task,
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
current_url=current_url,
element_tree_in_prompt=element_tree_in_prompt,
data_extraction_goal=plan,
task_history=task_history,
plan=plan,
browser_state=browser_state,
original_url=url,
scraped_page=scraped_page,
)
task_history_record = {"type": task_type, "task": plan}
elif task_type == "navigate":
original_url = url if i == 0 else None
navigation_goal = MINI_GOAL_TEMPLATE.format(main_goal=user_prompt, mini_goal=plan)
block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task(
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
original_url=original_url,
navigation_goal=navigation_goal,
totp_verification_url=observer_task.totp_verification_url,
totp_identifier=observer_task.totp_identifier,
)
task_history_record = {"type": task_type, "task": plan}
elif task_type == "loop":
try:
block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task(
observer_cruise=observer_task,
workflow_id=workflow_id,
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
plan=plan,
browser_state=browser_state,
original_url=url,
scraped_page=scraped_page,
)
task_history_record = {
"type": task_type,
"task": plan,
"loop_over_values": extraction_obj.get("loop_values"),
"task_inside_the_loop": inner_task,
}
except Exception:
LOG.exception("Failed to generate loop task")
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id,
failure_reason="Failed to generate the loop.",
)
break
else:
LOG.info("Unsupported task type", task_type=task_type)
task_history_record = {
"type": task_type,
"task": plan,
"loop_over_values": extraction_obj.get("loop_values"),
"task_inside_the_loop": inner_task,
}
except Exception:
LOG.exception("Failed to generate loop task")
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id,
failure_reason=f"Unsupported task block type gets generated: {task_type}",
failure_reason="Failed to generate the loop.",
)
break
else:
LOG.info("Unsupported task type", task_type=task_type)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
workflow_run_id=workflow_run_id,
failure_reason=f"Unsupported task block type gets generated: {task_type}",
)
break
# generate the extraction task
block_result = await block.execute_safe(
@@ -580,11 +565,6 @@ async def run_observer_task_helper(
if block_result.success is True:
completion_screenshots = []
try:
browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run(
workflow_run=workflow_run,
url=url,
browser_session_id=browser_session_id,
)
scraped_page = await scrape_website(
browser_state,
url,
@@ -615,6 +595,7 @@ async def run_observer_task_helper(
prompt=observer_completion_prompt,
screenshots=completion_screenshots,
observer_thought=observer_thought,
prompt_name="observer-check-completion",
)
LOG.info(
"Observer completion check response",
@@ -917,6 +898,7 @@ async def _generate_loop_task(
task_in_loop_metadata_prompt,
screenshots=scraped_page.screenshots,
observer_thought=observer_thought_task_in_loop,
prompt_name="observer-generate-task-block",
)
LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response)
navigation_goal = task_in_loop_metadata_response.get("navigation_goal")
@@ -1012,6 +994,7 @@ async def _generate_extraction_task(
generate_extraction_task_response = await app.LLM_API_HANDLER(
generate_extraction_task_prompt,
observer_cruise=observer_cruise,
prompt_name="observer-generate-extraction-task",
)
LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response)
@@ -1082,34 +1065,6 @@ async def _generate_navigation_task(
)
async def _generate_goto_url_task(
workflow_id: str,
url: str,
) -> tuple[UrlBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]:
LOG.info("Generating goto url task", url=url)
# create OutputParameter for the data_extraction block
label = f"goto_url_{_generate_random_string()}"
url_block_yaml = UrlBlockYAML(
label=label,
url=url,
)
output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block(
workflow_id=workflow_id,
block_yaml=url_block_yaml,
)
# create UrlBlock
return (
UrlBlock(
label=label,
url=url,
output_parameter=output_parameter,
),
[url_block_yaml],
[],
)
def _generate_random_string(length: int = 5) -> str:
# Use the current timestamp as the seed
random.seed(os.urandom(16))
@@ -1170,14 +1125,24 @@ async def mark_observer_task_as_completed(
output: dict[str, Any] | None = None,
) -> ObserverTask:
observer_task = await app.DATABASE.update_observer_cruise(
observer_cruise_id,
observer_cruise_id=observer_cruise_id,
organization_id=organization_id,
status=ObserverTaskStatus.completed,
workflow_run_id=workflow_run_id,
summary=summary,
output=output,
)
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
# Track observer cruise duration when completed
duration_seconds = (datetime.now(UTC) - observer_task.created_at).total_seconds()
LOG.info(
"Observer cruise duration metrics",
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
duration_seconds=duration_seconds,
status=ObserverTaskStatus.completed,
organization_id=organization_id,
)
await send_observer_task_webhook(observer_task)
return observer_task
@@ -1278,6 +1243,7 @@ async def _summarize_observer_task(
prompt=observer_summary_prompt,
screenshots=screenshots,
observer_thought=observer_thought,
prompt_name="observer-generate-summary",
)
LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp)

View File

@@ -246,7 +246,9 @@ class Block(BaseModel, abc.ABC):
"generate_workflow_run_block_description",
block=block_data,
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=description_generation_prompt)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=description_generation_prompt, prompt_name="generate-workflow-run-block-description"
)
description = json_response.get("summary")
LOG.info(
"Generated description for the workflow run block",
@@ -1198,7 +1200,7 @@ class TextPromptBlock(Block):
prompt=prompt,
llm_key=self.llm_key,
)
response = await llm_api_handler(prompt=prompt)
response = await llm_api_handler(prompt=prompt, prompt_name="text-prompt")
LOG.info("TextPromptBlock: Received response from LLM", response=response)
return response
@@ -1954,7 +1956,7 @@ class PDFParserBlock(Block):
llm_prompt = prompt_engine.load_prompt(
"extract-information-from-file-text", extracted_text_content=extracted_text, json_schema=self.json_schema
)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt)
llm_response = await app.LLM_API_HANDLER(prompt=llm_prompt, prompt_name="extract-information-from-file-text")
# Record the parsed data
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, llm_response)
return await self.build_block_result(

View File

@@ -1,5 +1,6 @@
import asyncio
import json
from datetime import UTC, datetime
from typing import Any
import httpx
@@ -468,6 +469,18 @@ class WorkflowService:
browser_session_id=browser_session_id,
close_browser_on_completion=browser_session_id is None,
)
# Track workflow run duration when completed
duration_seconds = (datetime.now(UTC) - workflow_run.created_at).total_seconds()
LOG.info(
"Workflow run duration metrics",
workflow_run_id=workflow_run_id,
workflow_id=workflow_run.workflow_id,
duration_seconds=duration_seconds,
status=WorkflowRunStatus.completed,
organization_id=organization_id,
)
return workflow_run
async def create_workflow(

View File

@@ -189,7 +189,7 @@ async def get_user_detail_answers(
)
llm_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=question_answering_prompt, step=step, screenshots=None
prompt=question_answering_prompt, step=step, screenshots=None, prompt_name="answer-user-detail-questions"
)
return llm_response
except Exception as e:

View File

@@ -758,7 +758,9 @@ async def handle_input_text_action(
elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML),
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="parse-input-or-select-context"
)
input_or_select_context = InputOrSelectContext.model_validate(json_response)
LOG.info(
"Parsed input/select context",
@@ -1675,7 +1677,9 @@ async def choose_auto_completion_dropdown(
step_id=step.step_id,
task_id=task.task_id,
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=auto_completion_confirm_prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=auto_completion_confirm_prompt, step=step, prompt_name="auto-completion-choose-option"
)
element_id = json_response.get("id", "")
relevance_float = json_response.get("relevance_float", 0)
if json_response.get("direct_searching", False):
@@ -1827,7 +1831,9 @@ async def input_or_auto_complete_input(
task_id=task.task_id,
potential_value_count=AUTO_COMPLETION_POTENTIAL_VALUES_COUNT,
)
json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_respone = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="auto-completion-potential-answers"
)
values: list[dict] = json_respone.get("potential_values", [])
for each_value in values:
@@ -1880,7 +1886,9 @@ async def input_or_auto_complete_input(
tried_values=json.dumps(tried_values),
popped_up_elements="".join([json_to_html(element) for element in cleaned_new_elements]),
)
json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_respone = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="auto-completion-tweak-value"
)
context_reasoning = json_respone.get("reasoning")
new_current_value = json_respone.get("tweaked_value", "")
if not new_current_value:
@@ -1929,7 +1937,9 @@ async def sequentially_select_from_dropdown(
element_id=action.element_id,
elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML),
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="parse-input-or-select-context"
)
input_or_select_context = InputOrSelectContext.model_validate(json_response)
LOG.info(
"Parsed input/select context",
@@ -2039,7 +2049,9 @@ async def sequentially_select_from_dropdown(
select_history=json.dumps(build_sequential_select_history(select_history)),
local_datetime=datetime.now(ensure_context().tz_info).isoformat(),
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, screenshots=[screenshot], step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, screenshots=[screenshot], step=step, prompt_name="confirm-multi-selection-finish"
)
if json_response.get("is_finished", False):
return single_select_result.action_result, values[-1] if len(values) > 0 else None
@@ -2144,9 +2156,9 @@ async def select_from_dropdown(
if context.is_date_related:
# HACK: according to the test, secondary LLM is not doing well on the date picker
# using the main LLM to handle the case
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select")
else:
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select")
value: str | None = json_response.get("value", None)
single_select_result.value = value
select_reason: str | None = json_response.get("reasoning", None)
@@ -2426,7 +2438,7 @@ async def locate_dropdown_menu(
element=element_dict,
)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=dropdown_confirm_prompt, screenshots=[screenshot], step=step
prompt=dropdown_confirm_prompt, screenshots=[screenshot], step=step, prompt_name="opened-dropdown-confirm"
)
is_opened_dropdown_menu = json_response.get("is_opened_dropdown_menu")
if is_opened_dropdown_menu:
@@ -2573,7 +2585,9 @@ async def normal_select(
element_id=action.element_id,
elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML),
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="parse-input-or-select-context"
)
input_or_select_context = InputOrSelectContext.model_validate(json_response)
LOG.info(
"Parsed input/select context",
@@ -2593,7 +2607,7 @@ async def normal_select(
options=options_html,
)
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step)
json_response = await app.LLM_API_HANDLER(prompt=prompt, step=step, prompt_name="custom-select")
index: int | None = json_response.get("index")
value: str | None = json_response.get("value")
@@ -2756,6 +2770,7 @@ async def extract_information_for_navigation_goal(
prompt=extract_information_prompt,
step=step,
screenshots=scraped_page.screenshots,
prompt_name="extract-information",
)
return ScrapeResult(