ExtractAction (#1632)

This commit is contained in:
Shuchang Zheng
2025-01-24 16:21:26 +08:00
committed by GitHub
parent 1c2425171f
commit b62a6fec23
10 changed files with 150 additions and 25 deletions

View File

@@ -64,6 +64,7 @@ from skyvern.webeye.actions.actions import (
CompleteAction,
CompleteVerifyResult,
DecisiveAction,
ExtractAction,
ReloadPageAction,
UserDefinedError,
WebAction,
@@ -721,19 +722,7 @@ class ForgeAgent:
using_cached_action_plan = False
if not task.navigation_goal and not isinstance(task_block, ValidationBlock):
actions = [
CompleteAction(
reasoning="Task has no navigation goal.",
data_extraction_goal=task.data_extraction_goal,
organization_id=task.organization_id,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
step_id=step.step_id,
step_order=step.order,
action_order=0,
confidence_float=1.0,
)
]
actions = [await self.create_extract_action(task, step, scraped_page)]
elif (
task_block
and task_block.cache_actions
@@ -1039,6 +1028,21 @@ class ForgeAgent:
)
detailed_agent_step_output.actions_and_results.append((complete_action, complete_results))
await self.record_artifacts_after_action(task, step, browser_state)
# if the last action is complete and is successful, check if there's a data extraction goal
# if task has navigation goal and extraction goal at the same time, handle ExtractAction before marking step as completed
if (
task.navigation_goal
and task.data_extraction_goal
and self.step_has_completed_goal(detailed_agent_step_output)
):
working_page = await browser_state.must_get_working_page()
extract_action = await self.create_extract_action(task, step, scraped_page)
extract_results = await ActionHandler.handle_action(
scraped_page, task, step, working_page, extract_action
)
detailed_agent_step_output.actions_and_results.append((extract_action, extract_results))
# If no action errors return the agent state and output
completed_step = await self.update_step(
step=step,
@@ -1490,6 +1494,7 @@ class ForgeAgent:
"""
Find the last successful ScrapeAction for the task and return the extracted information.
"""
# TODO: make sure we can get extracted information with the ExtractAction change
steps = await app.DATABASE.get_task_steps(
task_id=task.task_id,
organization_id=task.organization_id,
@@ -1500,7 +1505,7 @@ class ForgeAgent:
if not step.output or not step.output.actions_and_results:
continue
for action, action_results in step.output.actions_and_results:
if action.action_type != ActionType.COMPLETE:
if action.action_type != ActionType.EXTRACT:
continue
for action_result in action_results:
@@ -2197,3 +2202,43 @@ class ForgeAgent:
organization_id=task.organization_id,
errors=task_errors,
)
@staticmethod
async def create_extract_action(task: Task, step: Step, scraped_page: ScrapedPage) -> ExtractAction:
context = skyvern_context.ensure_context()
# generate reasoning by prompt llm to think briefly what data to extract
prompt = prompt_engine.load_prompt(
"data-extraction-summary",
data_extraction_goal=task.data_extraction_goal,
data_extraction_schema=task.extracted_information_schema,
current_url=scraped_page.url,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
data_extraction_summary_resp = await app.SECONDARY_LLM_API_HANDLER(
prompt=prompt,
step=step,
screenshots=scraped_page.screenshots,
)
return ExtractAction(
reasoning=data_extraction_summary_resp.get("summary", "Extracting information from the page"),
data_extraction_goal=task.data_extraction_goal,
organization_id=task.organization_id,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
step_id=step.step_id,
step_order=step.order,
action_order=0,
confidence_float=1.0,
)
@staticmethod
def step_has_completed_goal(detailed_agent_step_output: DetailedAgentStepOutput) -> bool:
if not detailed_agent_step_output.actions_and_results:
return False
last_action, last_action_results = detailed_agent_step_output.actions_and_results[-1]
if last_action.action_type not in [ActionType.COMPLETE, ActionType.EXTRACT]:
return False
return any(action_result.success for action_result in last_action_results)

View File

@@ -0,0 +1,23 @@
Your are an AI assistant to help the user extract data from websites. Given a goal to extract information from a web page{% if data_extraction_schema%} and the output schema of the data you're going to extract{% endif %}, summarize what data you're going to extract from the page so that the user has a clear overview of your plan.
Reply in JSON format with the following keys:
{
"summary": str, // Summary of the data you will extract within one sentence. Be precise and concise.
}
The URL of the page you're on right now is `{{ current_url }}`.
Data extraction goal:
```
{{ data_extraction_goal }}
```{% if data_extraction_schema %}
Data extraction schema:
```
{{ data_extraction_schema }}
```{% endif %}
Current datetime, ISO format:
```
{{ local_datetime }}
```

View File

@@ -2010,6 +2010,10 @@ class AgentDB:
prompt: str | None = None,
url: str | None = None,
organization_id: str | None = None,
proxy_location: ProxyLocation | None = None,
totp_identifier: str | None = None,
totp_verification_url: str | None = None,
webhook_callback_url: str | None = None,
) -> ObserverTask:
async with self.Session() as session:
new_observer_cruise = ObserverCruiseModel(
@@ -2018,6 +2022,10 @@ class AgentDB:
workflow_permanent_id=workflow_permanent_id,
prompt=prompt,
url=url,
proxy_location=proxy_location,
totp_identifier=totp_identifier,
totp_verification_url=totp_verification_url,
webhook_callback_url=webhook_callback_url,
organization_id=organization_id,
)
session.add(new_observer_cruise)

View File

@@ -87,6 +87,9 @@ class Step(BaseModel):
raise ValueError(f"cant_set_is_last_to_false({self.step_id})")
def is_goal_achieved(self) -> bool:
# TODO: now we also consider a step has achieved the goal if the task doesn't have a navigation goal
# and the data extraction is successful
if self.status != StepStatus.completed:
return False
# TODO (kerem): Remove this check once we have backfilled all the steps
@@ -94,14 +97,14 @@ class Step(BaseModel):
return False
# Check if there is a successful complete action
for action, action_results in self.output.actions_and_results:
if action.action_type != ActionType.COMPLETE:
continue
if not self.output.actions_and_results:
return False
if any(action_result.success for action_result in action_results):
return True
last_action, last_action_results = self.output.actions_and_results[-1]
if last_action.action_type not in [ActionType.COMPLETE, ActionType.EXTRACT]:
return False
return False
return any(action_result.success for action_result in last_action_results)
def is_success(self) -> bool:
if self.status != StepStatus.completed:

View File

@@ -1147,6 +1147,10 @@ async def observer_task(
organization=organization,
user_prompt=data.user_prompt,
user_url=str(data.url) if data.url else None,
totp_identifier=data.totp_identifier,
totp_verification_url=data.totp_verification_url,
webhook_callback_url=data.webhook_callback_url,
proxy_location=data.proxy_location,
)
except LLMProviderError:
LOG.error("LLM failure to initialize observer cruise", exc_info=True)

View File

@@ -112,6 +112,7 @@ class ObserverTaskRequest(BaseModel):
webhook_callback_url: str | None = None
totp_verification_url: str | None = None
totp_identifier: str | None = None
proxy_location: ProxyLocation | None = None
@field_validator("url", "webhook_callback_url", "totp_verification_url")
@classmethod

View File

@@ -80,11 +80,21 @@ def _generate_data_extraction_schema_for_loop(loop_values_key: str) -> dict:
async def initialize_observer_cruise(
organization: Organization, user_prompt: str, user_url: str | None = None
organization: Organization,
user_prompt: str,
user_url: str | None = None,
proxy_location: ProxyLocation | None = None,
totp_identifier: str | None = None,
totp_verification_url: str | None = None,
webhook_callback_url: str | None = None,
) -> ObserverTask:
observer_cruise = await app.DATABASE.create_observer_cruise(
prompt=user_prompt,
organization_id=organization.organization_id,
totp_verification_url=totp_verification_url,
totp_identifier=totp_identifier,
webhook_callback_url=webhook_callback_url,
proxy_location=proxy_location,
)
# set observer cruise id in context
context = skyvern_context.current()
@@ -117,7 +127,9 @@ async def initialize_observer_cruise(
# create workflow and workflow run
max_steps_override = 10
try:
new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(organization, metadata.workflow_title)
new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(
organization, metadata.workflow_title, proxy_location=proxy_location
)
workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
request_id=None,
workflow_request=WorkflowRequestBody(),

View File

@@ -1693,7 +1693,9 @@ class WorkflowService:
raise ValueError(f"Invalid block type {block_yaml.block_type}")
async def create_empty_workflow(self, organization: Organization, title: str) -> Workflow:
async def create_empty_workflow(
self, organization: Organization, title: str, proxy_location: ProxyLocation | None = None
) -> Workflow:
"""
Create a blank workflow with no blocks
"""
@@ -1704,6 +1706,7 @@ class WorkflowService:
parameters=[],
blocks=[],
),
proxy_location=proxy_location,
)
return await app.WORKFLOW_SERVICE.create_workflow_from_request(
organization=organization,