observer summary (#1532)

This commit is contained in:
Shuchang Zheng
2025-01-10 14:59:53 -08:00
committed by GitHub
parent 558b9befc4
commit 374b2326c4
6 changed files with 133 additions and 27 deletions

View File

@@ -2093,6 +2093,8 @@ class AgentDB:
workflow_permanent_id: str | None = None,
url: str | None = None,
prompt: str | None = None,
summary: str | None = None,
output: dict[str, Any] | None = None,
organization_id: str | None = None,
) -> ObserverCruise:
async with self.Session() as session:
@@ -2116,6 +2118,10 @@ class AgentDB:
observer_cruise.url = url
if prompt:
observer_cruise.prompt = prompt
if summary:
observer_cruise.summary = summary
if output:
observer_cruise.output = output
await session.commit()
await session.refresh(observer_cruise)
return ObserverCruise.model_validate(observer_cruise)

View File

@@ -548,6 +548,8 @@ class ObserverCruiseModel(Base):
workflow_permanent_id = Column(String, nullable=True)
prompt = Column(UnicodeText, nullable=True)
url = Column(String, nullable=True)
summary = Column(String, nullable=True)
output = Column(JSON, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)

View File

@@ -31,6 +31,8 @@ class ObserverCruise(BaseModel):
workflow_permanent_id: str | None = None
prompt: str | None = None
url: HttpUrl | None = None
summary: str | None = None
output: dict[str, Any] | list | str | None = None
created_at: datetime
modified_at: datetime
@@ -46,6 +48,7 @@ class ObserverThoughtType(StrEnum):
class ObserverThoughtScenario(StrEnum):
generate_plan = "generate_plan"
user_goal_check = "user_goal_check"
summarization = "summarization"
generate_metadata = "generate_metadata"
extract_loop_values = "extract_loop_values"
generate_task_in_loop = "generate_task_in_loop"

View File

@@ -17,7 +17,6 @@ from skyvern.forge.sdk.schemas.observers import (
ObserverCruise,
ObserverCruiseStatus,
ObserverMetadata,
ObserverThought,
ObserverThoughtScenario,
ObserverThoughtType,
)
@@ -209,13 +208,13 @@ async def run_observer_cruise(
organization_id=organization_id,
)
return
except Exception:
except Exception as e:
LOG.error("Failed to run observer cruise", exc_info=True)
failure_reason = f"Failed to run observer cruise: {str(e)}"
await mark_observer_cruise_as_failed(
observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
# TODO: add better failure reason
failure_reason="Failed to run observer cruise",
failure_reason=failure_reason,
organization_id=organization_id,
)
return
@@ -395,7 +394,12 @@ async def run_observer_cruise_helper(
iteration=i,
workflow_run_id=workflow_run_id,
)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
task_history=task_history,
context=context,
screenshots=scraped_page.screenshots,
)
break
# parse observer repsonse and run the next task
@@ -573,10 +577,11 @@ async def run_observer_cruise_helper(
workflow_run_id=workflow_run_id,
completion_resp=completion_resp,
)
await mark_observer_cruise_as_completed(
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
organization_id=organization_id,
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
task_history=task_history,
context=context,
screenshots=completion_screenshots,
)
break
else:
@@ -1039,24 +1044,6 @@ async def get_observer_thought_timelines(
]
async def _record_thought_screenshot(observer_thought: ObserverThought, workflow_run_id: str) -> None:
# get the browser state for the workflow run
browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=workflow_run_id)
if not browser_state:
LOG.warning("No browser state found for the workflow run", workflow_run_id=workflow_run_id)
return
# get the screenshot for the workflow run
try:
screenshot = await browser_state.take_screenshot(full_page=True)
await app.ARTIFACT_MANAGER.create_observer_thought_artifact(
observer_thought=observer_thought,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
except Exception:
LOG.warning("Failed to take screenshot for the observer thought", observer_thought=observer_thought)
async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverCruise | None:
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)
@@ -1080,11 +1067,15 @@ async def mark_observer_cruise_as_completed(
observer_cruise_id: str,
workflow_run_id: str | None = None,
organization_id: str | None = None,
summary: str | None = None,
output: dict[str, Any] | None = None,
) -> None:
await app.DATABASE.update_observer_cruise(
observer_cruise_id,
organization_id=organization_id,
status=ObserverCruiseStatus.completed,
summary=summary,
output=output,
)
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
@@ -1157,3 +1148,50 @@ def _get_extracted_data_from_block_result(
loop_output_overall.append(inner_loop_output_overall)
return loop_output_overall if loop_output_overall else None
return None
async def _summarize_observer_cruise(
observer_cruise: ObserverCruise,
task_history: list[dict],
context: SkyvernContext,
screenshots: list[bytes] | None = None,
) -> None:
observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id,
organization_id=observer_cruise.organization_id,
workflow_run_id=observer_cruise.workflow_run_id,
workflow_id=observer_cruise.workflow_id,
workflow_permanent_id=observer_cruise.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.user_goal_check,
observer_thought_scenario=ObserverThoughtScenario.summarization,
)
# summarize the observer cruise and format the output
observer_summary_prompt = prompt_engine.load_prompt(
"observer_summary",
user_goal=observer_cruise.prompt,
task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
observer_summary_resp = await app.LLM_API_HANDLER(
prompt=observer_summary_prompt,
screenshots=screenshots,
observer_thought=observer_thought,
)
LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp)
thought = observer_summary_resp.get("description")
summarized_output = observer_summary_resp.get("output")
await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id,
organization_id=observer_cruise.organization_id,
thought=thought,
output=observer_summary_resp,
)
await mark_observer_cruise_as_completed(
observer_cruise_id=observer_cruise.observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
organization_id=observer_cruise.organization_id,
summary=thought,
output=summarized_output,
)